Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Verify Writer Cluster Connections #139

Merged
merged 3 commits into from
Apr 27, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions driver/connect.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1047,7 +1047,7 @@ SQLRETURN SQL_API MySQLConnect(SQLHDBC hdbc,
dbc->init_proxy_chain(ds);
dbc->connection_handler = std::make_shared<CONNECTION_HANDLER>(dbc);
dbc->fh = new FAILOVER_HANDLER(dbc, ds);
rc = dbc->fh->init_cluster_info();
rc = dbc->fh->init_connection();
if (!dbc->ds)
ds_delete(ds);
return rc;
Expand Down Expand Up @@ -1165,7 +1165,7 @@ SQLRETURN SQL_API MySQLDriverConnect(SQLHDBC hdbc, SQLHWND hwnd,
dbc->init_proxy_chain(ds);
dbc->connection_handler = std::make_shared<CONNECTION_HANDLER>(dbc);
dbc->fh = new FAILOVER_HANDLER(dbc, ds);
rc = dbc->fh->init_cluster_info();
rc = dbc->fh->init_connection();
if (rc == SQL_SUCCESS || rc == SQL_SUCCESS_WITH_INFO)
goto connected;
bPrompt= TRUE;
Expand Down Expand Up @@ -1345,7 +1345,7 @@ SQLRETURN SQL_API MySQLDriverConnect(SQLHDBC hdbc, SQLHWND hwnd,
dbc->init_proxy_chain(ds);
dbc->connection_handler = std::make_shared<CONNECTION_HANDLER>(dbc);
dbc->fh = new FAILOVER_HANDLER(dbc, ds);
rc = dbc->fh->init_cluster_info();
rc = dbc->fh->init_connection();
if (rc != SQL_SUCCESS && rc != SQL_SUCCESS_WITH_INFO)
{
goto error;
Expand Down
15 changes: 10 additions & 5 deletions driver/failover.h
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ class FAILOVER_HANDLER {
std::shared_ptr<TOPOLOGY_SERVICE> topology_service,
std::shared_ptr<CLUSTER_AWARE_METRICS_CONTAINER> metrics_container);
~FAILOVER_HANDLER();
SQLRETURN init_cluster_info();
SQLRETURN init_connection();
bool trigger_failover_if_needed(const char* error_code, const char*& new_error_code, const char*& error_msg);
bool is_failover_enabled();
bool is_rds();
Expand All @@ -190,15 +190,20 @@ class FAILOVER_HANDLER {
bool m_is_rds_proxy = false;
bool m_is_rds = false;
bool m_is_rds_custom_cluster = false;
bool initialized = false;
bool is_cluster_info_initialized = false;

void init_cluster_info();
bool should_connect_to_new_writer();
void initialize_topology();
bool is_read_only();
virtual std::string host_to_IP(std::string host);
SQLRETURN reconnect(bool failover_enabled);
static bool is_dns_pattern_valid(std::string host);
static bool is_rds_dns(std::string host);
static bool is_rds_cluster_dns(std::string host);
static bool is_rds_proxy_dns(std::string host);
static bool is_rds_writer_cluster_dns(std::string host);
static bool is_rds_custom_cluster_dns(std::string host);
SQLRETURN create_connection_and_initialize_topology();
SQLRETURN reconnect(bool failover_enabled);
static std::string get_rds_cluster_host_url(std::string host);
static std::string get_rds_instance_host_pattern(std::string host);
bool is_ipv4(std::string host);
Expand All @@ -213,7 +218,7 @@ class FAILOVER_HANDLER {
std::chrono::steady_clock::time_point failover_start_time_ms;

#ifdef UNIT_TEST_BUILD
// Allows for testing private methods
// Allows for testing private/protected methods
friend class TEST_UTILS;
#endif
};
Expand Down
189 changes: 152 additions & 37 deletions driver/failover_handler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,15 @@
#include <sstream>

#include "driver.h"
#include "mylog.h"

#if defined(__APPLE__) || defined(__linux__)
#include <arpa/inet.h>
#include <netdb.h>
#include <netinet/in.h>
#include <sys/socket.h>
#include <sys/types.h>
#endif

namespace {
const std::regex AURORA_DNS_PATTERN(
Expand All @@ -47,6 +56,9 @@ const std::regex AURORA_PROXY_DNS_PATTERN(
const std::regex AURORA_CLUSTER_PATTERN(
R"#((.+)\.(cluster-|cluster-ro-)+([a-zA-Z0-9]+\.[a-zA-Z0-9\-]+\.rds\.amazonaws\.com))#",
std::regex_constants::icase);
const std::regex AURORA_WRITER_CLUSTER_PATTERN(
R"#((.+)\.(cluster-)+([a-zA-Z0-9]+\.[a-zA-Z0-9\-]+\.rds\.amazonaws\.com))#",
std::regex_constants::icase);
const std::regex AURORA_CUSTOM_CLUSTER_PATTERN(
R"#((.+)\.(cluster-custom-)+([a-zA-Z0-9]+\.[a-zA-Z0-9\-]+\.rds\.amazonaws\.com))#",
std::regex_constants::icase);
Expand All @@ -59,6 +71,9 @@ const std::regex AURORA_CHINA_PROXY_DNS_PATTERN(
const std::regex AURORA_CHINA_CLUSTER_PATTERN(
R"#((.+)\.(cluster-|cluster-ro-)+([a-zA-Z0-9]+\.rds\.[a-zA-Z0-9\-]+\.amazonaws\.com\.cn))#",
std::regex_constants::icase);
const std::regex AURORA_CHINA_WRITER_CLUSTER_PATTERN(
R"#((.+)\.(cluster-)+([a-zA-Z0-9]+\.rds\.[a-zA-Z0-9\-]+\.amazonaws\.com\.cn))#",
std::regex_constants::icase);
const std::regex AURORA_CHINA_CUSTOM_CLUSTER_PATTERN(
R"#((.+)\.(cluster-custom-)+([a-zA-Z0-9]+\.rds\.[a-zA-Z0-9\-]+\.amazonaws\.com\.cn))#",
std::regex_constants::icase);
Expand All @@ -67,6 +82,8 @@ const std::regex IPV4_PATTERN(
const std::regex IPV6_PATTERN(R"#(^[0-9a-fA-F]{1,4}(:[0-9a-fA-F]{1,4}){7}$)#");
const std::regex IPV6_COMPRESSED_PATTERN(
R"#(^(([0-9A-Fa-f]{1,4}(:[0-9A-Fa-f]{1,4}){0,5})?)::(([0-9A-Fa-f]{1,4}(:[0-9A-Fa-f]{1,4}){0,5})?)$)#");

const char* MYSQL_READONLY_QUERY = "SELECT @@innodb_read_only AS is_reader";
} // namespace

FAILOVER_HANDLER::FAILOVER_HANDLER(DBC* dbc, DataSource* ds)
Expand Down Expand Up @@ -108,18 +125,45 @@ FAILOVER_HANDLER::FAILOVER_HANDLER(DBC* dbc, DataSource* ds,

FAILOVER_HANDLER::~FAILOVER_HANDLER() {}

SQLRETURN FAILOVER_HANDLER::init_cluster_info() {
SQLRETURN rc = SQL_ERROR;
if (initialized) {
return rc;
SQLRETURN FAILOVER_HANDLER::init_connection() {
SQLRETURN rc = connection_handler->do_connect(dbc, ds, false);
if (SQL_SUCCEEDED(rc)) {
metrics_container->register_invalid_initial_connection(false);
}

if (!ds->enable_cluster_failover) {
// Use a standard default connection - no further initialization required
rc = connection_handler->do_connect(dbc, ds, false);
initialized = true;
else {
metrics_container->register_invalid_initial_connection(true);
return rc;
}

bool reconnect_with_updated_timeouts = false;
if (ds->enable_cluster_failover) {
this->init_cluster_info();

if (is_failover_enabled()) {
// Since we can't determine whether failover should be enabled
// before we connect, there is a possibility we need to reconnect
// again with the correct connection settings for failover.
const unsigned int connect_timeout = get_connect_timeout(ds->connect_timeout);
const unsigned int network_timeout = get_network_timeout(ds->network_timeout);

reconnect_with_updated_timeouts = (connect_timeout != dbc->login_timeout ||
network_timeout != ds->read_timeout ||
network_timeout != ds->write_timeout);
}
}

if (should_connect_to_new_writer() || reconnect_with_updated_timeouts) {
rc = reconnect(reconnect_with_updated_timeouts);
}

return rc;
}

void FAILOVER_HANDLER::init_cluster_info() {
if (is_cluster_info_initialized) {
return;
}

std::stringstream err;
// Cluster-aware failover is enabled

Expand Down Expand Up @@ -199,7 +243,7 @@ SQLRETURN FAILOVER_HANDLER::init_cluster_info() {
}
}

rc = create_connection_and_initialize_topology();
initialize_topology();
} else if (is_ipv4(main_host) || is_ipv6(main_host)) {
// TODO: do we need to setup host template in this case?
// HOST_INFO* host_template = new HOST_INFO();
Expand All @@ -211,7 +255,7 @@ SQLRETURN FAILOVER_HANDLER::init_cluster_info() {
set_cluster_id(clid_str);
}

rc = create_connection_and_initialize_topology();
initialize_topology();

if (m_is_cluster_topology_available) {
err << "Host Pattern configuration setting is required when IP "
Expand Down Expand Up @@ -242,7 +286,7 @@ SQLRETURN FAILOVER_HANDLER::init_cluster_info() {
set_cluster_id(clid_str);
}

rc = create_connection_and_initialize_topology();
initialize_topology();

if (m_is_cluster_topology_available) {
err << "The provided host appears to be a custom domain. The "
Expand Down Expand Up @@ -288,12 +332,58 @@ SQLRETURN FAILOVER_HANDLER::init_cluster_info() {
}
}

rc = create_connection_and_initialize_topology();
initialize_topology();
}
}

initialized = true;
return rc;
is_cluster_info_initialized = true;
}

bool FAILOVER_HANDLER::should_connect_to_new_writer() {
auto host = (const char*)ds->server8;
if (host == nullptr || host == "") {
return false;
}

if (!is_rds_writer_cluster_dns(host)) {
return false;
}

std::string host_ip = host_to_IP(host);
if (host_ip == "") {
return false;
}

this->init_cluster_info();

// We need to force refresh the topology if we are connected to a read only instance.
auto topology = topology_service->get_topology(dbc->connection_proxy, is_read_only());

std::shared_ptr<HOST_INFO> writer;
try {
writer = topology->get_writer();
}
catch (std::runtime_error) {
return false;
}

std::string writer_host = writer->get_host();
if (is_rds_cluster_dns(writer_host.c_str())) {
return false;
}

std::string writer_host_ip = host_to_IP(writer_host);
if (writer_host_ip == "" || writer_host_ip == host_ip) {
return false;
}

// DNS must have resolved the cluster endpoint to a wrong writer
// so we should reconnect to a proper writer node.
const sqlwchar_string writer_host_wstr = to_sqlwchar_string(writer_host);
ds_set_wstrnattr(&ds->server, (SQLWCHAR*)writer_host_wstr.c_str(), writer_host_wstr.size());
ds_set_strnattr(&ds->server8, (SQLCHAR*)writer_host.c_str(), writer_host.size());

return true;
}

void FAILOVER_HANDLER::set_cluster_id(std::string host, int port) {
Expand Down Expand Up @@ -322,10 +412,55 @@ bool FAILOVER_HANDLER::is_rds_proxy_dns(std::string host) {
return std::regex_match(host, AURORA_PROXY_DNS_PATTERN) || std::regex_match(host, AURORA_CHINA_PROXY_DNS_PATTERN);
}

bool FAILOVER_HANDLER::is_rds_writer_cluster_dns(std::string host) {
return std::regex_match(host, AURORA_WRITER_CLUSTER_PATTERN) || std::regex_match(host, AURORA_CHINA_WRITER_CLUSTER_PATTERN);
}

bool FAILOVER_HANDLER::is_rds_custom_cluster_dns(std::string host) {
return std::regex_match(host, AURORA_CUSTOM_CLUSTER_PATTERN) || std::regex_match(host, AURORA_CHINA_CUSTOM_CLUSTER_PATTERN);
}

bool FAILOVER_HANDLER::is_read_only() {
bool read_only = false;
if (dbc->connection_proxy->query(MYSQL_READONLY_QUERY) == 0) {
auto result = dbc->connection_proxy->store_result();
yanw-bq marked this conversation as resolved.
Show resolved Hide resolved
MYSQL_ROW row;
if (row = dbc->connection_proxy->fetch_row(result)) {
read_only = (strcmp(row[0], "1") == 0);
}
dbc->connection_proxy->free_result(result);
}

return read_only;
}

std::string FAILOVER_HANDLER::host_to_IP(std::string host) {
int status;
struct addrinfo hints;
struct addrinfo* servinfo;
struct addrinfo* p;
char ipstr[INET_ADDRSTRLEN];

memset(&hints, 0, sizeof(hints));
hints.ai_family = AF_INET; //IPv4
hints.ai_socktype = SOCK_STREAM;

if ((status = getaddrinfo(host.c_str(), NULL, &hints, &servinfo)) != 0) {
return "";
}

for (p = servinfo; p != NULL; p = p->ai_next) {
void* addr;

struct sockaddr_in* ipv4 = (struct sockaddr_in*)p->ai_addr;
addr = &(ipv4->sin_addr);
inet_ntop(p->ai_family, addr, ipstr, sizeof(ipstr));
}

freeaddrinfo(servinfo);
return std::string(ipstr);
}

#if defined(__APPLE__) || defined(__linux__)
#define strcmp_case_insensitive(str1, str2) strcasecmp(str1, str2)
#else
Expand Down Expand Up @@ -398,39 +533,19 @@ bool FAILOVER_HANDLER::is_cluster_topology_available() {
return m_is_cluster_topology_available;
}

SQLRETURN FAILOVER_HANDLER::create_connection_and_initialize_topology() {
SQLRETURN rc = connection_handler->do_connect(dbc, ds, false);
if (!SQL_SUCCEEDED(rc)) {
metrics_container->register_invalid_initial_connection(true);
return rc;
}

metrics_container->register_invalid_initial_connection(false);
void FAILOVER_HANDLER::initialize_topology() {

current_topology = topology_service->get_topology(dbc->connection_proxy, false);
if (current_topology) {
m_is_multi_writer_cluster = current_topology->is_multi_writer_cluster;
m_is_cluster_topology_available = current_topology->total_hosts() > 0;
MYLOG_DBC_TRACE(dbc,
"[FAILOVER_HANDLER] m_is_cluster_topology_available=%s",
m_is_cluster_topology_available ? "true" : "false");

// Since we can't determine whether failover should be enabled
// before we connect, there is a possibility we need to reconnect
// again with the correct connection settings for failover.
const unsigned int connect_timeout = get_connect_timeout(ds->connect_timeout);
const unsigned int network_timeout = get_network_timeout(ds->network_timeout);

if (is_failover_enabled() && (connect_timeout != dbc->login_timeout ||
network_timeout != ds->read_timeout ||
network_timeout != ds->write_timeout)) {
rc = reconnect(true);
}
if (is_failover_enabled()) {
this->dbc->env->failover_thread_pool.resize(current_topology->total_hosts());
}
}

return rc;
}

SQLRETURN FAILOVER_HANDLER::reconnect(bool failover_enabled) {
Expand Down
6 changes: 6 additions & 0 deletions integration/base_failover_integration_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -486,6 +486,12 @@ class BaseFailoverIntegrationTest : public testing::Test {
}

void test_connection(const SQLHDBC dbc, const std::string& test_server, const int test_port) {
sprintf(reinterpret_cast<char*>(conn_in), "%sSERVER=%s;PORT=%d;", get_default_config().c_str(), test_server.c_str(), test_port);
EXPECT_EQ(SQL_SUCCESS, SQLDriverConnect(dbc, nullptr, conn_in, SQL_NTS, conn_out, MAX_NAME_LEN, &len, SQL_DRIVER_NOPROMPT));
EXPECT_EQ(SQL_SUCCESS, SQLDisconnect(dbc));
}

void test_connection_with_proxy_pattern(const SQLHDBC dbc, const std::string& test_server, const int test_port) {
sprintf(reinterpret_cast<char*>(conn_in), "%sSERVER=%s;PORT=%d;", get_default_proxied_config().c_str(), test_server.c_str(), test_port);
EXPECT_EQ(SQL_SUCCESS, SQLDriverConnect(dbc, nullptr, conn_in, SQL_NTS, conn_out, MAX_NAME_LEN, &len, SQL_DRIVER_NOPROMPT));
EXPECT_EQ(SQL_SUCCESS, SQLDisconnect(dbc));
Expand Down
6 changes: 3 additions & 3 deletions integration/network_failover_integration_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -93,11 +93,11 @@ class NetworkFailoverIntegrationTest : public BaseFailoverIntegrationTest {

TEST_F(NetworkFailoverIntegrationTest, connection_test) {
test_connection(dbc, MYSQL_INSTANCE_1_URL, MYSQL_PORT);
test_connection(dbc, MYSQL_INSTANCE_1_URL + PROXIED_DOMAIN_NAME_SUFFIX, MYSQL_PROXY_PORT);
test_connection_with_proxy_pattern(dbc, MYSQL_INSTANCE_1_URL + PROXIED_DOMAIN_NAME_SUFFIX, MYSQL_PROXY_PORT);
test_connection(dbc, MYSQL_CLUSTER_URL, MYSQL_PORT);
test_connection(dbc, MYSQL_CLUSTER_URL + PROXIED_DOMAIN_NAME_SUFFIX, MYSQL_PROXY_PORT);
test_connection_with_proxy_pattern(dbc, MYSQL_CLUSTER_URL + PROXIED_DOMAIN_NAME_SUFFIX, MYSQL_PROXY_PORT);
test_connection(dbc, MYSQL_RO_CLUSTER_URL, MYSQL_PORT);
test_connection(dbc, MYSQL_RO_CLUSTER_URL + PROXIED_DOMAIN_NAME_SUFFIX, MYSQL_PROXY_PORT);
test_connection_with_proxy_pattern(dbc, MYSQL_RO_CLUSTER_URL + PROXIED_DOMAIN_NAME_SUFFIX, MYSQL_PROXY_PORT);
}

TEST_F(NetworkFailoverIntegrationTest, lost_connection_to_writer) {
Expand Down
Loading