Skip to content

Commit

Permalink
Fix failover timeout not obeyed (#138)
Browse files Browse the repository at this point in the history
* replace async future with packed_task and thread to avoid blocking future destructor

* fix unit tests

* add template arguments for std::packaged_task

* missing template arguments

* use thread pool to run failover and wait for all when driver unloads, fix logger

* increase thread pool size

* fix reader failover where failed result overwrite successful result

* not stop thread pool on free env handle

* redirect maven central and stop thread pool in teardown

* maven central

* add thread id to logs

* move future valid check before wait for

* move future valid check before wait_for() for writer failover

* mark logger as extern

* test diable logs

* test failover_integration_test first

* test

* debug gh action

* make env var available for debug session

* debug in docker

* refactor integration tests to use shared pointer for rds client

* make rds client back to non static

* fix build

* reset rds client in teardown

* Revert "refactor integration tests to use shared pointer for rds client"

This reverts commit 2ec1ae5.

* disable remote debug

* fix build

* debug

* specifically call thread pool stop in myodbc_end

* move failover thread pool inside env

* fix unit test

* fix build

* cleanup
  • Loading branch information
yanw-bq committed May 18, 2023
1 parent 616b129 commit 71ce777
Show file tree
Hide file tree
Showing 27 changed files with 403 additions and 306 deletions.
3 changes: 2 additions & 1 deletion .github/workflows/dockerized.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ on:
- '**/*.md'
- '**/*.jpg'
- '**/README.txt'
- '**/LICENSE.txt'
- 'docs/**'
- 'ISSUE_TEMPLATE/**'
- '**/remove-old-artifacts.yml'
Expand Down Expand Up @@ -179,7 +180,7 @@ jobs:
echo "TEMP_AWS_ACCESS_KEY_ID=${creds[0]}" >> $GITHUB_ENV
echo "TEMP_AWS_SECRET_ACCESS_KEY=${creds[1]}" >> $GITHUB_ENV
echo "TEMP_AWS_SESSION_TOKEN=${creds[2]}" >> $GITHUB_ENV
- name: 'Run Integration Tests'
working-directory: ${{ github.workspace }}/testframework
run: |
Expand Down
2 changes: 1 addition & 1 deletion driver/cluster_aware_metrics_container.cc
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ void CLUSTER_AWARE_METRICS_CONTAINER::set_gather_metric(bool gather) {
this->can_gather = gather;
}

void CLUSTER_AWARE_METRICS_CONTAINER::report_metrics(std::string conn_url, bool for_instances, FILE* log, unsigned long dbc_id) {
void CLUSTER_AWARE_METRICS_CONTAINER::report_metrics(std::string conn_url, bool for_instances, std::shared_ptr<FILE> log, unsigned long dbc_id) {
if (!log) {
return;
}
Expand Down
2 changes: 1 addition & 1 deletion driver/cluster_aware_metrics_container.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ class CLUSTER_AWARE_METRICS_CONTAINER {

void set_gather_metric(bool gather);

static void report_metrics(std::string conn_url, bool for_instances, FILE* log, unsigned long dbc_id);
static void report_metrics(std::string conn_url, bool for_instances, std::shared_ptr<FILE> log, unsigned long dbc_id);
static std::string report_metrics(std::string conn_url, bool for_instances);
static void reset_metrics();

Expand Down
8 changes: 3 additions & 5 deletions driver/connect.cc
Original file line number Diff line number Diff line change
Expand Up @@ -285,15 +285,15 @@ std::shared_ptr<HOST_INFO> get_host_info_from_ds(DataSource* ds) {
} catch (std::string &) {
err << "Invalid server '" << ds->server8 << "'.";
if (ds->save_queries) {
MYLOG_TRACE(init_log_file().get(), 0, err.str().c_str());
MYLOG_TRACE(init_log_file(), 0, err.str().c_str());
}
throw std::runtime_error(err.str());
}

if (hosts.size() == 0) {
err << "No host was retrieved from the data source.";
if (ds->save_queries) {
MYLOG_TRACE(init_log_file().get(), 0, err.str().c_str());
MYLOG_TRACE(init_log_file(), 0, err.str().c_str());
}
throw std::runtime_error(err.str());
}
Expand Down Expand Up @@ -1535,9 +1535,7 @@ SQLRETURN SQL_API SQLDisconnect(SQLHDBC hdbc)
cluster_id_str.append(":").append(std::to_string(dbc->connection_proxy->get_port()));
}

CLUSTER_AWARE_METRICS_CONTAINER::report_metrics(
cluster_id_str, dbc->ds->gather_metrics_per_instance,
dbc->log_file ? dbc->log_file.get() : nullptr, dbc->id);
CLUSTER_AWARE_METRICS_CONTAINER::report_metrics(cluster_id_str, dbc->ds->gather_metrics_per_instance, dbc->log_file, dbc->id);
}

CHECK_HANDLE(hdbc);
Expand Down
2 changes: 1 addition & 1 deletion driver/connection_handler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ SQLRETURN CONNECTION_HANDLER::do_connect(DBC* dbc_ptr, DataSource* ds, bool fail
return dbc_ptr->connect(ds, failover_enabled);
}

CONNECTION_PROXY* CONNECTION_HANDLER::connect(const std::shared_ptr<HOST_INFO>& host_info, DataSource* ds) {
CONNECTION_PROXY* CONNECTION_HANDLER::connect(std::shared_ptr<HOST_INFO> host_info, DataSource* ds) {

if (dbc == nullptr || host_info == nullptr) {
return nullptr;
Expand Down
2 changes: 1 addition & 1 deletion driver/connection_handler.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ class CONNECTION_HANDLER {
virtual ~CONNECTION_HANDLER();

virtual SQLRETURN do_connect(DBC* dbc_ptr, DataSource* ds, bool failover_enabled);
virtual CONNECTION_PROXY* connect(const std::shared_ptr<HOST_INFO>& host_info, DataSource* ds);
virtual CONNECTION_PROXY* connect(std::shared_ptr<HOST_INFO> host_info, DataSource* ds);
void update_connection(CONNECTION_PROXY* new_connection, const std::string& new_host_name);

private:
Expand Down
2 changes: 2 additions & 0 deletions driver/driver.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
#define __DRIVER_H__

#include <atomic>
#include <ctpl_stl.h>

#include "../MYODBC_MYSQL.h"
#include "../MYODBC_CONF.h"
Expand Down Expand Up @@ -593,6 +594,7 @@ struct ENV
std::list<DBC*> conn_list;
MYERROR error;
std::mutex lock;
ctpl::thread_pool failover_thread_pool;

ENV(SQLINTEGER ver) : odbc_ver(ver)
{}
Expand Down
45 changes: 26 additions & 19 deletions driver/failover.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,26 +72,27 @@ class FAILOVER_READER_HANDLER {
FAILOVER_READER_HANDLER(
std::shared_ptr<TOPOLOGY_SERVICE> topology_service,
std::shared_ptr<CONNECTION_HANDLER> connection_handler,
ctpl::thread_pool& thread_pool,
int failover_timeout_ms, int failover_reader_connect_timeout,
bool enable_strict_reader_failover,
unsigned long dbc_id, bool enable_logging = false);

~FAILOVER_READER_HANDLER();

READER_FAILOVER_RESULT failover(
std::shared_ptr<READER_FAILOVER_RESULT> failover(
std::shared_ptr<CLUSTER_TOPOLOGY_INFO> topology_info);

virtual READER_FAILOVER_RESULT get_reader_connection(
virtual std::shared_ptr<READER_FAILOVER_RESULT> get_reader_connection(
std::shared_ptr<CLUSTER_TOPOLOGY_INFO> topology_info,
FAILOVER_SYNC& f_sync);
std::shared_ptr<FAILOVER_SYNC> f_sync);

std::vector<std::shared_ptr<HOST_INFO>> build_hosts_list(
const std::shared_ptr<CLUSTER_TOPOLOGY_INFO>& topology_info,
std::shared_ptr<CLUSTER_TOPOLOGY_INFO> topology_info,
bool contain_writers);

READER_FAILOVER_RESULT get_connection_from_hosts(
std::shared_ptr<READER_FAILOVER_RESULT> get_connection_from_hosts(
std::vector<std::shared_ptr<HOST_INFO>> hosts_list,
FAILOVER_SYNC& global_sync);
std::shared_ptr<FAILOVER_SYNC> global_sync);

protected:
int reader_connect_timeout_ms = 30000; // 30 sec
Expand All @@ -104,6 +105,7 @@ class FAILOVER_READER_HANDLER {
bool enable_strict_reader_failover = false;
std::shared_ptr<FILE> logger = nullptr;
unsigned long dbc_id = 0;
ctpl::thread_pool& thread_pool;
};

// This struct holds results of Writer Failover Process.
Expand Down Expand Up @@ -135,10 +137,11 @@ class FAILOVER_WRITER_HANDLER {
std::shared_ptr<TOPOLOGY_SERVICE> topology_service,
std::shared_ptr<FAILOVER_READER_HANDLER> reader_handler,
std::shared_ptr<CONNECTION_HANDLER> connection_handler,
ctpl::thread_pool& thread_pool,
int writer_failover_timeout_ms, int read_topology_interval_ms,
int reconnect_writer_interval_ms, unsigned long dbc_id, bool enable_logging = false);
~FAILOVER_WRITER_HANDLER();
WRITER_FAILOVER_RESULT failover(
std::shared_ptr<WRITER_FAILOVER_RESULT> failover(
std::shared_ptr<CLUSTER_TOPOLOGY_INFO> current_topology);

protected:
Expand All @@ -152,6 +155,7 @@ class FAILOVER_WRITER_HANDLER {
std::shared_ptr<FAILOVER_READER_HANDLER> reader_handler;
std::shared_ptr<FILE> logger = nullptr;
unsigned long dbc_id = 0;
ctpl::thread_pool& thread_pool;
};

class FAILOVER_HANDLER {
Expand Down Expand Up @@ -222,7 +226,7 @@ class FAILOVER {
bool is_writer_connected();

protected:
bool connect(const std::shared_ptr<HOST_INFO>& host_info);
bool connect(std::shared_ptr<HOST_INFO> host_info);
void sleep(int miliseconds);
void release_new_connection();
std::shared_ptr<CONNECTION_HANDLER> connection_handler;
Expand All @@ -241,9 +245,10 @@ class CONNECT_TO_READER_HANDLER : public FAILOVER {
~CONNECT_TO_READER_HANDLER();

void operator()(
int id,
std::shared_ptr<HOST_INFO> reader,
FAILOVER_SYNC& f_sync,
READER_FAILOVER_RESULT& result);
std::shared_ptr<FAILOVER_SYNC> f_sync,
std::shared_ptr<READER_FAILOVER_RESULT> result);
};

class RECONNECT_TO_WRITER_HANDLER : public FAILOVER {
Expand All @@ -255,16 +260,17 @@ class RECONNECT_TO_WRITER_HANDLER : public FAILOVER {
~RECONNECT_TO_WRITER_HANDLER();

void operator()(
int id,
std::shared_ptr<HOST_INFO> original_writer,
FAILOVER_SYNC& f_sync,
WRITER_FAILOVER_RESULT& result);
std::shared_ptr<FAILOVER_SYNC> f_sync,
std::shared_ptr<WRITER_FAILOVER_RESULT> result);

private:
int reconnect_interval_ms;

bool is_current_host_writer(
const std::shared_ptr<HOST_INFO>& original_writer,
const std::shared_ptr<CLUSTER_TOPOLOGY_INFO>& latest_topology);
std::shared_ptr<HOST_INFO> original_writer,
std::shared_ptr<CLUSTER_TOPOLOGY_INFO> latest_topology);
};

class WAIT_NEW_WRITER_HANDLER : public FAILOVER {
Expand All @@ -278,21 +284,22 @@ class WAIT_NEW_WRITER_HANDLER : public FAILOVER {
~WAIT_NEW_WRITER_HANDLER();

void operator()(
int id,
std::shared_ptr<HOST_INFO> original_writer,
FAILOVER_SYNC& f_sync,
WRITER_FAILOVER_RESULT& result);
std::shared_ptr<FAILOVER_SYNC> f_sync,
std::shared_ptr<WRITER_FAILOVER_RESULT> result);

private:
// TODO - initialize in constructor and define constant for default value
int read_topology_interval_ms = 5000;
std::shared_ptr<FAILOVER_READER_HANDLER> reader_handler;
std::shared_ptr<CLUSTER_TOPOLOGY_INFO> current_topology;
CONNECTION_PROXY* reader_connection = nullptr; // To retrieve latest topology
std::shared_ptr<HOST_INFO> current_reader_host;
std::shared_ptr<HOST_INFO> current_reader_host = nullptr;

void refresh_topology_and_connect_to_new_writer(
std::shared_ptr<HOST_INFO> original_writer, FAILOVER_SYNC& f_sync);
void connect_to_reader(FAILOVER_SYNC& f_sync);
std::shared_ptr<HOST_INFO> original_writer, std::shared_ptr<FAILOVER_SYNC> f_sync);
void connect_to_reader(std::shared_ptr<FAILOVER_SYNC> f_sync);
bool connect_to_writer(std::shared_ptr<HOST_INFO> writer_candidate);
void clean_up_reader_connection();
};
Expand Down
23 changes: 13 additions & 10 deletions driver/failover_handler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -80,12 +80,12 @@ FAILOVER_HANDLER::FAILOVER_HANDLER(DBC* dbc, DataSource* ds,
this->connection_handler = connection_handler;

this->failover_reader_handler = std::make_shared<FAILOVER_READER_HANDLER>(
this->topology_service, this->connection_handler, ds->failover_timeout,
this->topology_service, this->connection_handler, dbc->env->failover_thread_pool, ds->failover_timeout,
ds->failover_reader_connect_timeout, ds->enable_strict_reader_failover,
dbc->id, ds->save_queries);
this->failover_writer_handler = std::make_shared<FAILOVER_WRITER_HANDLER>(
this->topology_service, this->failover_reader_handler,
this->connection_handler, ds->failover_timeout,
this->connection_handler, dbc->env->failover_thread_pool, ds->failover_timeout,
ds->failover_topology_refresh_rate,
ds->failover_writer_reconnect_interval, dbc->id, ds->save_queries);
this->metrics_container = metrics_container;
Expand Down Expand Up @@ -128,7 +128,7 @@ SQLRETURN FAILOVER_HANDLER::init_cluster_info() {
host_patterns = parse_host_list(hp_str.c_str(), port);
} catch (std::string&) {
err << "Invalid host pattern: '" << hp_str << "' - the value could not be parsed";
MYLOG_TRACE(dbc->log_file.get(), dbc->id, err.str().c_str());
MYLOG_TRACE(dbc->log_file, dbc->id, err.str().c_str());
throw std::runtime_error(err.str());
}

Expand Down Expand Up @@ -388,6 +388,9 @@ SQLRETURN FAILOVER_HANDLER::create_connection_and_initialize_topology() {
network_timeout != ds->write_timeout)) {
rc = reconnect(true);
}
if (is_failover_enabled()) {
this->dbc->env->failover_thread_pool.resize(current_topology->total_hosts());
}
}

return rc;
Expand Down Expand Up @@ -471,9 +474,9 @@ bool FAILOVER_HANDLER::failover_to_reader(const char*& new_error_code, const cha
MYLOG_DBC_TRACE(dbc, "[FAILOVER_HANDLER] Starting reader failover procedure.");
auto result = failover_reader_handler->failover(current_topology);

if (result.connected) {
current_host = result.new_host;
connection_handler->update_connection(result.new_connection, current_host->get_host());
if (result->connected) {
current_host = result->new_host;
connection_handler->update_connection(result->new_connection, current_host->get_host());
new_error_code = "08S02";
error_msg = "The active SQL connection has changed.";
MYLOG_DBC_TRACE(dbc,
Expand All @@ -494,20 +497,20 @@ bool FAILOVER_HANDLER::failover_to_writer(const char*& new_error_code, const cha
MYLOG_DBC_TRACE(dbc, "[FAILOVER_HANDLER] Starting writer failover procedure.");
auto result = failover_writer_handler->failover(current_topology);

if (!result.connected) {
if (!result->connected) {
MYLOG_DBC_TRACE(dbc, "[FAILOVER_HANDLER] Unable to establish SQL connection to writer node.");
new_error_code = "08S01";
error_msg = "The active SQL connection was lost.";
return false;
}
if (result.is_new_host) {
if (result->is_new_host) {
// connected to a new writer host; take it over
current_topology = result.new_topology;
current_topology = result->new_topology;
current_host = current_topology->get_writer();
}

connection_handler->update_connection(
result.new_connection, result.new_topology->get_writer()->get_host());
result->new_connection, result->new_topology->get_writer()->get_host());

new_error_code = "08S02";
error_msg = "The active SQL connection has changed.";
Expand Down
Loading

0 comments on commit 71ce777

Please sign in to comment.