Skip to content

Commit

Permalink
Verify Writer Cluster Connections (#139)
Browse files Browse the repository at this point in the history
  • Loading branch information
justing-bq authored and yanw-bq committed May 17, 2023
1 parent 50c2c04 commit d17c37a
Show file tree
Hide file tree
Showing 9 changed files with 262 additions and 63 deletions.
6 changes: 3 additions & 3 deletions driver/connect.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1121,7 +1121,7 @@ SQLRETURN SQL_API MySQLConnect(SQLHDBC hdbc,
dbc->init_proxy_chain(ds);
dbc->connection_handler = std::make_shared<CONNECTION_HANDLER>(dbc);
dbc->fh = new FAILOVER_HANDLER(dbc, ds);
rc = dbc->fh->init_cluster_info();
rc = dbc->fh->init_connection();
if (!dbc->ds)
ds_delete(ds);
return rc;
Expand Down Expand Up @@ -1239,7 +1239,7 @@ SQLRETURN SQL_API MySQLDriverConnect(SQLHDBC hdbc, SQLHWND hwnd,
dbc->init_proxy_chain(ds);
dbc->connection_handler = std::make_shared<CONNECTION_HANDLER>(dbc);
dbc->fh = new FAILOVER_HANDLER(dbc, ds);
rc = dbc->fh->init_cluster_info();
rc = dbc->fh->init_connection();
if (rc == SQL_SUCCESS || rc == SQL_SUCCESS_WITH_INFO)
goto connected;
bPrompt= TRUE;
Expand Down Expand Up @@ -1419,7 +1419,7 @@ SQLRETURN SQL_API MySQLDriverConnect(SQLHDBC hdbc, SQLHWND hwnd,
dbc->init_proxy_chain(ds);
dbc->connection_handler = std::make_shared<CONNECTION_HANDLER>(dbc);
dbc->fh = new FAILOVER_HANDLER(dbc, ds);
rc = dbc->fh->init_cluster_info();
rc = dbc->fh->init_connection();
if (rc != SQL_SUCCESS && rc != SQL_SUCCESS_WITH_INFO)
{
goto error;
Expand Down
15 changes: 10 additions & 5 deletions driver/failover.h
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ class FAILOVER_HANDLER {
std::shared_ptr<TOPOLOGY_SERVICE> topology_service,
std::shared_ptr<CLUSTER_AWARE_METRICS_CONTAINER> metrics_container);
~FAILOVER_HANDLER();
SQLRETURN init_cluster_info();
SQLRETURN init_connection();
bool trigger_failover_if_needed(const char* error_code, const char*& new_error_code, const char*& error_msg);
bool is_failover_enabled();
bool is_rds();
Expand All @@ -190,15 +190,20 @@ class FAILOVER_HANDLER {
bool m_is_rds_proxy = false;
bool m_is_rds = false;
bool m_is_rds_custom_cluster = false;
bool initialized = false;
bool is_cluster_info_initialized = false;

void init_cluster_info();
bool should_connect_to_new_writer();
void initialize_topology();
bool is_read_only();
virtual std::string host_to_IP(std::string host);
SQLRETURN reconnect(bool failover_enabled);
static bool is_dns_pattern_valid(std::string host);
static bool is_rds_dns(std::string host);
static bool is_rds_cluster_dns(std::string host);
static bool is_rds_proxy_dns(std::string host);
static bool is_rds_writer_cluster_dns(std::string host);
static bool is_rds_custom_cluster_dns(std::string host);
SQLRETURN create_connection_and_initialize_topology();
SQLRETURN reconnect(bool failover_enabled);
static std::string get_rds_cluster_host_url(std::string host);
static std::string get_rds_instance_host_pattern(std::string host);
bool is_ipv4(std::string host);
Expand All @@ -213,7 +218,7 @@ class FAILOVER_HANDLER {
std::chrono::steady_clock::time_point failover_start_time_ms;

#ifdef UNIT_TEST_BUILD
// Allows for testing private methods
// Allows for testing private/protected methods
friend class TEST_UTILS;
#endif
};
Expand Down
189 changes: 152 additions & 37 deletions driver/failover_handler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,15 @@
#include <sstream>

#include "driver.h"
#include "mylog.h"

#if defined(__APPLE__) || defined(__linux__)
#include <arpa/inet.h>
#include <netdb.h>
#include <netinet/in.h>
#include <sys/socket.h>
#include <sys/types.h>
#endif

namespace {
const std::regex AURORA_DNS_PATTERN(
Expand All @@ -47,6 +56,9 @@ const std::regex AURORA_PROXY_DNS_PATTERN(
const std::regex AURORA_CLUSTER_PATTERN(
R"#((.+)\.(cluster-|cluster-ro-)+([a-zA-Z0-9]+\.[a-zA-Z0-9\-]+\.rds\.amazonaws\.com))#",
std::regex_constants::icase);
const std::regex AURORA_WRITER_CLUSTER_PATTERN(
R"#((.+)\.(cluster-)+([a-zA-Z0-9]+\.[a-zA-Z0-9\-]+\.rds\.amazonaws\.com))#",
std::regex_constants::icase);
const std::regex AURORA_CUSTOM_CLUSTER_PATTERN(
R"#((.+)\.(cluster-custom-)+([a-zA-Z0-9]+\.[a-zA-Z0-9\-]+\.rds\.amazonaws\.com))#",
std::regex_constants::icase);
Expand All @@ -59,6 +71,9 @@ const std::regex AURORA_CHINA_PROXY_DNS_PATTERN(
const std::regex AURORA_CHINA_CLUSTER_PATTERN(
R"#((.+)\.(cluster-|cluster-ro-)+([a-zA-Z0-9]+\.rds\.[a-zA-Z0-9\-]+\.amazonaws\.com\.cn))#",
std::regex_constants::icase);
const std::regex AURORA_CHINA_WRITER_CLUSTER_PATTERN(
R"#((.+)\.(cluster-)+([a-zA-Z0-9]+\.rds\.[a-zA-Z0-9\-]+\.amazonaws\.com\.cn))#",
std::regex_constants::icase);
const std::regex AURORA_CHINA_CUSTOM_CLUSTER_PATTERN(
R"#((.+)\.(cluster-custom-)+([a-zA-Z0-9]+\.rds\.[a-zA-Z0-9\-]+\.amazonaws\.com\.cn))#",
std::regex_constants::icase);
Expand All @@ -67,6 +82,8 @@ const std::regex IPV4_PATTERN(
const std::regex IPV6_PATTERN(R"#(^[0-9a-fA-F]{1,4}(:[0-9a-fA-F]{1,4}){7}$)#");
const std::regex IPV6_COMPRESSED_PATTERN(
R"#(^(([0-9A-Fa-f]{1,4}(:[0-9A-Fa-f]{1,4}){0,5})?)::(([0-9A-Fa-f]{1,4}(:[0-9A-Fa-f]{1,4}){0,5})?)$)#");

const char* MYSQL_READONLY_QUERY = "SELECT @@innodb_read_only AS is_reader";
} // namespace

FAILOVER_HANDLER::FAILOVER_HANDLER(DBC* dbc, DataSource* ds)
Expand Down Expand Up @@ -108,18 +125,45 @@ FAILOVER_HANDLER::FAILOVER_HANDLER(DBC* dbc, DataSource* ds,

FAILOVER_HANDLER::~FAILOVER_HANDLER() {}

SQLRETURN FAILOVER_HANDLER::init_cluster_info() {
SQLRETURN rc = SQL_ERROR;
if (initialized) {
return rc;
SQLRETURN FAILOVER_HANDLER::init_connection() {
SQLRETURN rc = connection_handler->do_connect(dbc, ds, false);
if (SQL_SUCCEEDED(rc)) {
metrics_container->register_invalid_initial_connection(false);
}

if (!ds->enable_cluster_failover) {
// Use a standard default connection - no further initialization required
rc = connection_handler->do_connect(dbc, ds, false);
initialized = true;
else {
metrics_container->register_invalid_initial_connection(true);
return rc;
}

bool reconnect_with_updated_timeouts = false;
if (ds->enable_cluster_failover) {
this->init_cluster_info();

if (is_failover_enabled()) {
// Since we can't determine whether failover should be enabled
// before we connect, there is a possibility we need to reconnect
// again with the correct connection settings for failover.
const unsigned int connect_timeout = get_connect_timeout(ds->connect_timeout);
const unsigned int network_timeout = get_network_timeout(ds->network_timeout);

reconnect_with_updated_timeouts = (connect_timeout != dbc->login_timeout ||
network_timeout != ds->read_timeout ||
network_timeout != ds->write_timeout);
}
}

if (should_connect_to_new_writer() || reconnect_with_updated_timeouts) {
rc = reconnect(reconnect_with_updated_timeouts);
}

return rc;
}

void FAILOVER_HANDLER::init_cluster_info() {
if (is_cluster_info_initialized) {
return;
}

std::stringstream err;
// Cluster-aware failover is enabled

Expand Down Expand Up @@ -199,7 +243,7 @@ SQLRETURN FAILOVER_HANDLER::init_cluster_info() {
}
}

rc = create_connection_and_initialize_topology();
initialize_topology();
} else if (is_ipv4(main_host) || is_ipv6(main_host)) {
// TODO: do we need to setup host template in this case?
// HOST_INFO* host_template = new HOST_INFO();
Expand All @@ -211,7 +255,7 @@ SQLRETURN FAILOVER_HANDLER::init_cluster_info() {
set_cluster_id(clid_str);
}

rc = create_connection_and_initialize_topology();
initialize_topology();

if (m_is_cluster_topology_available) {
err << "Host Pattern configuration setting is required when IP "
Expand Down Expand Up @@ -242,7 +286,7 @@ SQLRETURN FAILOVER_HANDLER::init_cluster_info() {
set_cluster_id(clid_str);
}

rc = create_connection_and_initialize_topology();
initialize_topology();

if (m_is_cluster_topology_available) {
err << "The provided host appears to be a custom domain. The "
Expand Down Expand Up @@ -288,12 +332,58 @@ SQLRETURN FAILOVER_HANDLER::init_cluster_info() {
}
}

rc = create_connection_and_initialize_topology();
initialize_topology();
}
}

initialized = true;
return rc;
is_cluster_info_initialized = true;
}

bool FAILOVER_HANDLER::should_connect_to_new_writer() {
auto host = (const char*)ds->server8;
if (host == nullptr || host == "") {
return false;
}

if (!is_rds_writer_cluster_dns(host)) {
return false;
}

std::string host_ip = host_to_IP(host);
if (host_ip == "") {
return false;
}

this->init_cluster_info();

// We need to force refresh the topology if we are connected to a read only instance.
auto topology = topology_service->get_topology(dbc->connection_proxy, is_read_only());

std::shared_ptr<HOST_INFO> writer;
try {
writer = topology->get_writer();
}
catch (std::runtime_error) {
return false;
}

std::string writer_host = writer->get_host();
if (is_rds_cluster_dns(writer_host.c_str())) {
return false;
}

std::string writer_host_ip = host_to_IP(writer_host);
if (writer_host_ip == "" || writer_host_ip == host_ip) {
return false;
}

// DNS must have resolved the cluster endpoint to a wrong writer
// so we should reconnect to a proper writer node.
const sqlwchar_string writer_host_wstr = to_sqlwchar_string(writer_host);
ds_set_wstrnattr(&ds->server, (SQLWCHAR*)writer_host_wstr.c_str(), writer_host_wstr.size());
ds_set_strnattr(&ds->server8, (SQLCHAR*)writer_host.c_str(), writer_host.size());

return true;
}

void FAILOVER_HANDLER::set_cluster_id(std::string host, int port) {
Expand Down Expand Up @@ -322,10 +412,55 @@ bool FAILOVER_HANDLER::is_rds_proxy_dns(std::string host) {
return std::regex_match(host, AURORA_PROXY_DNS_PATTERN) || std::regex_match(host, AURORA_CHINA_PROXY_DNS_PATTERN);
}

bool FAILOVER_HANDLER::is_rds_writer_cluster_dns(std::string host) {
return std::regex_match(host, AURORA_WRITER_CLUSTER_PATTERN) || std::regex_match(host, AURORA_CHINA_WRITER_CLUSTER_PATTERN);
}

bool FAILOVER_HANDLER::is_rds_custom_cluster_dns(std::string host) {
return std::regex_match(host, AURORA_CUSTOM_CLUSTER_PATTERN) || std::regex_match(host, AURORA_CHINA_CUSTOM_CLUSTER_PATTERN);
}

bool FAILOVER_HANDLER::is_read_only() {
bool read_only = false;
if (dbc->connection_proxy->query(MYSQL_READONLY_QUERY) == 0) {
auto result = dbc->connection_proxy->store_result();
MYSQL_ROW row;
if (row = dbc->connection_proxy->fetch_row(result)) {
read_only = (strcmp(row[0], "1") == 0);
}
dbc->connection_proxy->free_result(result);
}

return read_only;
}

std::string FAILOVER_HANDLER::host_to_IP(std::string host) {
int status;
struct addrinfo hints;
struct addrinfo* servinfo;
struct addrinfo* p;
char ipstr[INET_ADDRSTRLEN];

memset(&hints, 0, sizeof(hints));
hints.ai_family = AF_INET; //IPv4
hints.ai_socktype = SOCK_STREAM;

if ((status = getaddrinfo(host.c_str(), NULL, &hints, &servinfo)) != 0) {
return "";
}

for (p = servinfo; p != NULL; p = p->ai_next) {
void* addr;

struct sockaddr_in* ipv4 = (struct sockaddr_in*)p->ai_addr;
addr = &(ipv4->sin_addr);
inet_ntop(p->ai_family, addr, ipstr, sizeof(ipstr));
}

freeaddrinfo(servinfo);
return std::string(ipstr);
}

#if defined(__APPLE__) || defined(__linux__)
#define strcmp_case_insensitive(str1, str2) strcasecmp(str1, str2)
#else
Expand Down Expand Up @@ -398,39 +533,19 @@ bool FAILOVER_HANDLER::is_cluster_topology_available() {
return m_is_cluster_topology_available;
}

SQLRETURN FAILOVER_HANDLER::create_connection_and_initialize_topology() {
SQLRETURN rc = connection_handler->do_connect(dbc, ds, false);
if (!SQL_SUCCEEDED(rc)) {
metrics_container->register_invalid_initial_connection(true);
return rc;
}

metrics_container->register_invalid_initial_connection(false);
void FAILOVER_HANDLER::initialize_topology() {

current_topology = topology_service->get_topology(dbc->connection_proxy, false);
if (current_topology) {
m_is_multi_writer_cluster = current_topology->is_multi_writer_cluster;
m_is_cluster_topology_available = current_topology->total_hosts() > 0;
MYLOG_DBC_TRACE(dbc,
"[FAILOVER_HANDLER] m_is_cluster_topology_available=%s",
m_is_cluster_topology_available ? "true" : "false");

// Since we can't determine whether failover should be enabled
// before we connect, there is a possibility we need to reconnect
// again with the correct connection settings for failover.
const unsigned int connect_timeout = get_connect_timeout(ds->connect_timeout);
const unsigned int network_timeout = get_network_timeout(ds->network_timeout);

if (is_failover_enabled() && (connect_timeout != dbc->login_timeout ||
network_timeout != ds->read_timeout ||
network_timeout != ds->write_timeout)) {
rc = reconnect(true);
}
if (is_failover_enabled()) {
this->dbc->env->failover_thread_pool.resize(current_topology->total_hosts());
}
}

return rc;
}

SQLRETURN FAILOVER_HANDLER::reconnect(bool failover_enabled) {
Expand Down
6 changes: 6 additions & 0 deletions integration/base_failover_integration_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -486,6 +486,12 @@ class BaseFailoverIntegrationTest : public testing::Test {
}

void test_connection(const SQLHDBC dbc, const std::string& test_server, const int test_port) {
sprintf(reinterpret_cast<char*>(conn_in), "%sSERVER=%s;PORT=%d;", get_default_config().c_str(), test_server.c_str(), test_port);
EXPECT_EQ(SQL_SUCCESS, SQLDriverConnect(dbc, nullptr, conn_in, SQL_NTS, conn_out, MAX_NAME_LEN, &len, SQL_DRIVER_NOPROMPT));
EXPECT_EQ(SQL_SUCCESS, SQLDisconnect(dbc));
}

void test_connection_with_proxy_pattern(const SQLHDBC dbc, const std::string& test_server, const int test_port) {
sprintf(reinterpret_cast<char*>(conn_in), "%sSERVER=%s;PORT=%d;", get_default_proxied_config().c_str(), test_server.c_str(), test_port);
EXPECT_EQ(SQL_SUCCESS, SQLDriverConnect(dbc, nullptr, conn_in, SQL_NTS, conn_out, MAX_NAME_LEN, &len, SQL_DRIVER_NOPROMPT));
EXPECT_EQ(SQL_SUCCESS, SQLDisconnect(dbc));
Expand Down
6 changes: 3 additions & 3 deletions integration/network_failover_integration_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -93,11 +93,11 @@ class NetworkFailoverIntegrationTest : public BaseFailoverIntegrationTest {

TEST_F(NetworkFailoverIntegrationTest, connection_test) {
test_connection(dbc, MYSQL_INSTANCE_1_URL, MYSQL_PORT);
test_connection(dbc, MYSQL_INSTANCE_1_URL + PROXIED_DOMAIN_NAME_SUFFIX, MYSQL_PROXY_PORT);
test_connection_with_proxy_pattern(dbc, MYSQL_INSTANCE_1_URL + PROXIED_DOMAIN_NAME_SUFFIX, MYSQL_PROXY_PORT);
test_connection(dbc, MYSQL_CLUSTER_URL, MYSQL_PORT);
test_connection(dbc, MYSQL_CLUSTER_URL + PROXIED_DOMAIN_NAME_SUFFIX, MYSQL_PROXY_PORT);
test_connection_with_proxy_pattern(dbc, MYSQL_CLUSTER_URL + PROXIED_DOMAIN_NAME_SUFFIX, MYSQL_PROXY_PORT);
test_connection(dbc, MYSQL_RO_CLUSTER_URL, MYSQL_PORT);
test_connection(dbc, MYSQL_RO_CLUSTER_URL + PROXIED_DOMAIN_NAME_SUFFIX, MYSQL_PROXY_PORT);
test_connection_with_proxy_pattern(dbc, MYSQL_RO_CLUSTER_URL + PROXIED_DOMAIN_NAME_SUFFIX, MYSQL_PROXY_PORT);
}

TEST_F(NetworkFailoverIntegrationTest, lost_connection_to_writer) {
Expand Down
Loading

0 comments on commit d17c37a

Please sign in to comment.