Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix failover timeout not obeyed #138

Merged
merged 33 commits into from
Apr 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
0f0b6c6
replace async future with packed_task and thread to avoid blocking fu…
yanw-bq Apr 18, 2023
1924ab8
fix unit tests
yanw-bq Apr 18, 2023
75cefd9
add template arguments for std::packaged_task
yanw-bq Apr 18, 2023
8ead084
missing template arguments
yanw-bq Apr 18, 2023
91ddb40
use thread pool to run failover and wait for all when driver unloads,…
yanw-bq Apr 19, 2023
07405ea
increase thread pool size
yanw-bq Apr 19, 2023
3f69598
fix reader failover where failed result overwrite successful result
yanw-bq Apr 19, 2023
c870e56
not stop thread pool on free env handle
yanw-bq Apr 20, 2023
479a1c5
redirect maven central and stop thread pool in teardown
yanw-bq Apr 20, 2023
e0017bf
maven central
yanw-bq Apr 20, 2023
e35602f
add thread id to logs
yanw-bq Apr 20, 2023
f4e4db4
move future valid check before wait for
yanw-bq Apr 20, 2023
d222b09
move future valid check before wait_for() for writer failover
yanw-bq Apr 20, 2023
b0c6955
mark logger as extern
yanw-bq Apr 20, 2023
9f79d3a
test diable logs
yanw-bq Apr 20, 2023
a14e2e5
test failover_integration_test first
yanw-bq Apr 20, 2023
0c98d9c
test
yanw-bq Apr 20, 2023
1aa0cad
debug gh action
yanw-bq Apr 20, 2023
d4ecb66
make env var available for debug session
yanw-bq Apr 20, 2023
254cb1a
debug in docker
yanw-bq Apr 21, 2023
2ec1ae5
refactor integration tests to use shared pointer for rds client
yanw-bq Apr 21, 2023
410964c
make rds client back to non static
yanw-bq Apr 21, 2023
5e30e6a
fix build
yanw-bq Apr 21, 2023
473d90f
reset rds client in teardown
yanw-bq Apr 21, 2023
08eed0a
Revert "refactor integration tests to use shared pointer for rds client"
yanw-bq Apr 21, 2023
6b30d0d
disable remote debug
yanw-bq Apr 21, 2023
158c5d6
fix build
yanw-bq Apr 21, 2023
8fd5078
debug
yanw-bq Apr 21, 2023
403054e
specifically call thread pool stop in myodbc_end
yanw-bq Apr 21, 2023
e839797
move failover thread pool inside env
yanw-bq Apr 21, 2023
8853c62
fix unit test
yanw-bq Apr 21, 2023
646ee2b
fix build
yanw-bq Apr 21, 2023
a1d50d3
cleanup
yanw-bq Apr 21, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .github/workflows/dockerized.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ on:
- '**/*.md'
- '**/*.jpg'
- '**/README.txt'
- '**/LICENSE.txt'
- 'docs/**'
- 'ISSUE_TEMPLATE/**'
- '**/remove-old-artifacts.yml'
Expand Down Expand Up @@ -179,7 +180,7 @@ jobs:
echo "TEMP_AWS_ACCESS_KEY_ID=${creds[0]}" >> $GITHUB_ENV
echo "TEMP_AWS_SECRET_ACCESS_KEY=${creds[1]}" >> $GITHUB_ENV
echo "TEMP_AWS_SESSION_TOKEN=${creds[2]}" >> $GITHUB_ENV

- name: 'Run Integration Tests'
working-directory: ${{ github.workspace }}/testframework
run: |
Expand Down
2 changes: 1 addition & 1 deletion driver/cluster_aware_metrics_container.cc
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ void CLUSTER_AWARE_METRICS_CONTAINER::set_gather_metric(bool gather) {
this->can_gather = gather;
}

void CLUSTER_AWARE_METRICS_CONTAINER::report_metrics(std::string conn_url, bool for_instances, FILE* log, unsigned long dbc_id) {
void CLUSTER_AWARE_METRICS_CONTAINER::report_metrics(std::string conn_url, bool for_instances, std::shared_ptr<FILE> log, unsigned long dbc_id) {
if (!log) {
return;
}
Expand Down
2 changes: 1 addition & 1 deletion driver/cluster_aware_metrics_container.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ class CLUSTER_AWARE_METRICS_CONTAINER {

void set_gather_metric(bool gather);

static void report_metrics(std::string conn_url, bool for_instances, FILE* log, unsigned long dbc_id);
static void report_metrics(std::string conn_url, bool for_instances, std::shared_ptr<FILE> log, unsigned long dbc_id);
static std::string report_metrics(std::string conn_url, bool for_instances);
static void reset_metrics();

Expand Down
8 changes: 3 additions & 5 deletions driver/connect.cc
Original file line number Diff line number Diff line change
Expand Up @@ -282,15 +282,15 @@ std::shared_ptr<HOST_INFO> get_host_info_from_ds(DataSource* ds) {
} catch (std::string &) {
err << "Invalid server '" << ds->server8 << "'.";
if (ds->save_queries) {
MYLOG_TRACE(init_log_file().get(), 0, err.str().c_str());
MYLOG_TRACE(init_log_file(), 0, err.str().c_str());
}
throw std::runtime_error(err.str());
}

if (hosts.size() == 0) {
err << "No host was retrieved from the data source.";
if (ds->save_queries) {
MYLOG_TRACE(init_log_file().get(), 0, err.str().c_str());
MYLOG_TRACE(init_log_file(), 0, err.str().c_str());
}
throw std::runtime_error(err.str());
}
Expand Down Expand Up @@ -1448,9 +1448,7 @@ SQLRETURN SQL_API SQLDisconnect(SQLHDBC hdbc)
cluster_id_str.append(":").append(std::to_string(dbc->connection_proxy->get_port()));
}

CLUSTER_AWARE_METRICS_CONTAINER::report_metrics(
cluster_id_str, dbc->ds->gather_metrics_per_instance,
dbc->log_file ? dbc->log_file.get() : nullptr, dbc->id);
CLUSTER_AWARE_METRICS_CONTAINER::report_metrics(cluster_id_str, dbc->ds->gather_metrics_per_instance, dbc->log_file, dbc->id);
}

CHECK_HANDLE(hdbc);
Expand Down
2 changes: 1 addition & 1 deletion driver/connection_handler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ SQLRETURN CONNECTION_HANDLER::do_connect(DBC* dbc_ptr, DataSource* ds, bool fail
return dbc_ptr->connect(ds, failover_enabled);
}

CONNECTION_PROXY* CONNECTION_HANDLER::connect(const std::shared_ptr<HOST_INFO>& host_info, DataSource* ds) {
CONNECTION_PROXY* CONNECTION_HANDLER::connect(std::shared_ptr<HOST_INFO> host_info, DataSource* ds) {

if (dbc == nullptr || host_info == nullptr) {
return nullptr;
Expand Down
2 changes: 1 addition & 1 deletion driver/connection_handler.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ class CONNECTION_HANDLER {
virtual ~CONNECTION_HANDLER();

virtual SQLRETURN do_connect(DBC* dbc_ptr, DataSource* ds, bool failover_enabled);
virtual CONNECTION_PROXY* connect(const std::shared_ptr<HOST_INFO>& host_info, DataSource* ds);
virtual CONNECTION_PROXY* connect(std::shared_ptr<HOST_INFO> host_info, DataSource* ds);
void update_connection(CONNECTION_PROXY* new_connection, const std::string& new_host_name);

private:
Expand Down
2 changes: 2 additions & 0 deletions driver/driver.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
#define __DRIVER_H__

#include <atomic>
#include <ctpl_stl.h>

#include "../MYODBC_MYSQL.h"
#include "../MYODBC_CONF.h"
Expand Down Expand Up @@ -593,6 +594,7 @@ struct ENV
std::list<DBC*> conn_list;
MYERROR error;
std::mutex lock;
ctpl::thread_pool failover_thread_pool;

ENV(SQLINTEGER ver) : odbc_ver(ver)
{}
Expand Down
45 changes: 26 additions & 19 deletions driver/failover.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,26 +72,27 @@ class FAILOVER_READER_HANDLER {
FAILOVER_READER_HANDLER(
std::shared_ptr<TOPOLOGY_SERVICE> topology_service,
std::shared_ptr<CONNECTION_HANDLER> connection_handler,
ctpl::thread_pool& thread_pool,
int failover_timeout_ms, int failover_reader_connect_timeout,
bool enable_strict_reader_failover,
unsigned long dbc_id, bool enable_logging = false);

~FAILOVER_READER_HANDLER();

READER_FAILOVER_RESULT failover(
std::shared_ptr<READER_FAILOVER_RESULT> failover(
std::shared_ptr<CLUSTER_TOPOLOGY_INFO> topology_info);

virtual READER_FAILOVER_RESULT get_reader_connection(
virtual std::shared_ptr<READER_FAILOVER_RESULT> get_reader_connection(
std::shared_ptr<CLUSTER_TOPOLOGY_INFO> topology_info,
FAILOVER_SYNC& f_sync);
std::shared_ptr<FAILOVER_SYNC> f_sync);

std::vector<std::shared_ptr<HOST_INFO>> build_hosts_list(
const std::shared_ptr<CLUSTER_TOPOLOGY_INFO>& topology_info,
std::shared_ptr<CLUSTER_TOPOLOGY_INFO> topology_info,
bool contain_writers);

READER_FAILOVER_RESULT get_connection_from_hosts(
std::shared_ptr<READER_FAILOVER_RESULT> get_connection_from_hosts(
std::vector<std::shared_ptr<HOST_INFO>> hosts_list,
FAILOVER_SYNC& global_sync);
std::shared_ptr<FAILOVER_SYNC> global_sync);

protected:
int reader_connect_timeout_ms = 30000; // 30 sec
Expand All @@ -104,6 +105,7 @@ class FAILOVER_READER_HANDLER {
bool enable_strict_reader_failover = false;
std::shared_ptr<FILE> logger = nullptr;
unsigned long dbc_id = 0;
ctpl::thread_pool& thread_pool;
};

// This struct holds results of Writer Failover Process.
Expand Down Expand Up @@ -135,10 +137,11 @@ class FAILOVER_WRITER_HANDLER {
std::shared_ptr<TOPOLOGY_SERVICE> topology_service,
std::shared_ptr<FAILOVER_READER_HANDLER> reader_handler,
std::shared_ptr<CONNECTION_HANDLER> connection_handler,
ctpl::thread_pool& thread_pool,
int writer_failover_timeout_ms, int read_topology_interval_ms,
int reconnect_writer_interval_ms, unsigned long dbc_id, bool enable_logging = false);
~FAILOVER_WRITER_HANDLER();
WRITER_FAILOVER_RESULT failover(
std::shared_ptr<WRITER_FAILOVER_RESULT> failover(
std::shared_ptr<CLUSTER_TOPOLOGY_INFO> current_topology);

protected:
Expand All @@ -152,6 +155,7 @@ class FAILOVER_WRITER_HANDLER {
std::shared_ptr<FAILOVER_READER_HANDLER> reader_handler;
std::shared_ptr<FILE> logger = nullptr;
unsigned long dbc_id = 0;
ctpl::thread_pool& thread_pool;
};

class FAILOVER_HANDLER {
Expand Down Expand Up @@ -222,7 +226,7 @@ class FAILOVER {
bool is_writer_connected();

protected:
bool connect(const std::shared_ptr<HOST_INFO>& host_info);
bool connect(std::shared_ptr<HOST_INFO> host_info);
void sleep(int miliseconds);
void release_new_connection();
std::shared_ptr<CONNECTION_HANDLER> connection_handler;
Expand All @@ -241,9 +245,10 @@ class CONNECT_TO_READER_HANDLER : public FAILOVER {
~CONNECT_TO_READER_HANDLER();

void operator()(
int id,
std::shared_ptr<HOST_INFO> reader,
FAILOVER_SYNC& f_sync,
READER_FAILOVER_RESULT& result);
std::shared_ptr<FAILOVER_SYNC> f_sync,
std::shared_ptr<READER_FAILOVER_RESULT> result);
};

class RECONNECT_TO_WRITER_HANDLER : public FAILOVER {
Expand All @@ -255,16 +260,17 @@ class RECONNECT_TO_WRITER_HANDLER : public FAILOVER {
~RECONNECT_TO_WRITER_HANDLER();

void operator()(
int id,
std::shared_ptr<HOST_INFO> original_writer,
FAILOVER_SYNC& f_sync,
WRITER_FAILOVER_RESULT& result);
std::shared_ptr<FAILOVER_SYNC> f_sync,
std::shared_ptr<WRITER_FAILOVER_RESULT> result);

private:
int reconnect_interval_ms;

bool is_current_host_writer(
const std::shared_ptr<HOST_INFO>& original_writer,
const std::shared_ptr<CLUSTER_TOPOLOGY_INFO>& latest_topology);
std::shared_ptr<HOST_INFO> original_writer,
std::shared_ptr<CLUSTER_TOPOLOGY_INFO> latest_topology);
};

class WAIT_NEW_WRITER_HANDLER : public FAILOVER {
Expand All @@ -278,21 +284,22 @@ class WAIT_NEW_WRITER_HANDLER : public FAILOVER {
~WAIT_NEW_WRITER_HANDLER();

void operator()(
int id,
std::shared_ptr<HOST_INFO> original_writer,
FAILOVER_SYNC& f_sync,
WRITER_FAILOVER_RESULT& result);
std::shared_ptr<FAILOVER_SYNC> f_sync,
std::shared_ptr<WRITER_FAILOVER_RESULT> result);

private:
// TODO - initialize in constructor and define constant for default value
int read_topology_interval_ms = 5000;
std::shared_ptr<FAILOVER_READER_HANDLER> reader_handler;
std::shared_ptr<CLUSTER_TOPOLOGY_INFO> current_topology;
CONNECTION_PROXY* reader_connection = nullptr; // To retrieve latest topology
std::shared_ptr<HOST_INFO> current_reader_host;
std::shared_ptr<HOST_INFO> current_reader_host = nullptr;

void refresh_topology_and_connect_to_new_writer(
std::shared_ptr<HOST_INFO> original_writer, FAILOVER_SYNC& f_sync);
void connect_to_reader(FAILOVER_SYNC& f_sync);
std::shared_ptr<HOST_INFO> original_writer, std::shared_ptr<FAILOVER_SYNC> f_sync);
void connect_to_reader(std::shared_ptr<FAILOVER_SYNC> f_sync);
bool connect_to_writer(std::shared_ptr<HOST_INFO> writer_candidate);
void clean_up_reader_connection();
};
Expand Down
23 changes: 13 additions & 10 deletions driver/failover_handler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -80,12 +80,12 @@ FAILOVER_HANDLER::FAILOVER_HANDLER(DBC* dbc, DataSource* ds,
this->connection_handler = connection_handler;

this->failover_reader_handler = std::make_shared<FAILOVER_READER_HANDLER>(
this->topology_service, this->connection_handler, ds->failover_timeout,
this->topology_service, this->connection_handler, dbc->env->failover_thread_pool, ds->failover_timeout,
ds->failover_reader_connect_timeout, ds->enable_strict_reader_failover,
dbc->id, ds->save_queries);
this->failover_writer_handler = std::make_shared<FAILOVER_WRITER_HANDLER>(
this->topology_service, this->failover_reader_handler,
this->connection_handler, ds->failover_timeout,
this->connection_handler, dbc->env->failover_thread_pool, ds->failover_timeout,
ds->failover_topology_refresh_rate,
ds->failover_writer_reconnect_interval, dbc->id, ds->save_queries);
this->metrics_container = metrics_container;
Expand Down Expand Up @@ -128,7 +128,7 @@ SQLRETURN FAILOVER_HANDLER::init_cluster_info() {
host_patterns = parse_host_list(hp_str.c_str(), port);
} catch (std::string&) {
err << "Invalid host pattern: '" << hp_str << "' - the value could not be parsed";
MYLOG_TRACE(dbc->log_file.get(), dbc->id, err.str().c_str());
MYLOG_TRACE(dbc->log_file, dbc->id, err.str().c_str());
throw std::runtime_error(err.str());
}

Expand Down Expand Up @@ -388,6 +388,9 @@ SQLRETURN FAILOVER_HANDLER::create_connection_and_initialize_topology() {
network_timeout != ds->write_timeout)) {
rc = reconnect(true);
}
if (is_failover_enabled()) {
this->dbc->env->failover_thread_pool.resize(current_topology->total_hosts());
}
}

return rc;
Expand Down Expand Up @@ -471,9 +474,9 @@ bool FAILOVER_HANDLER::failover_to_reader(const char*& new_error_code, const cha
MYLOG_DBC_TRACE(dbc, "[FAILOVER_HANDLER] Starting reader failover procedure.");
auto result = failover_reader_handler->failover(current_topology);

if (result.connected) {
current_host = result.new_host;
connection_handler->update_connection(result.new_connection, current_host->get_host());
if (result->connected) {
current_host = result->new_host;
connection_handler->update_connection(result->new_connection, current_host->get_host());
new_error_code = "08S02";
error_msg = "The active SQL connection has changed.";
MYLOG_DBC_TRACE(dbc,
Expand All @@ -494,20 +497,20 @@ bool FAILOVER_HANDLER::failover_to_writer(const char*& new_error_code, const cha
MYLOG_DBC_TRACE(dbc, "[FAILOVER_HANDLER] Starting writer failover procedure.");
auto result = failover_writer_handler->failover(current_topology);

if (!result.connected) {
if (!result->connected) {
MYLOG_DBC_TRACE(dbc, "[FAILOVER_HANDLER] Unable to establish SQL connection to writer node.");
new_error_code = "08S01";
error_msg = "The active SQL connection was lost.";
return false;
}
if (result.is_new_host) {
if (result->is_new_host) {
// connected to a new writer host; take it over
current_topology = result.new_topology;
current_topology = result->new_topology;
current_host = current_topology->get_writer();
}

connection_handler->update_connection(
result.new_connection, result.new_topology->get_writer()->get_host());
result->new_connection, result->new_topology->get_writer()->get_host());

new_error_code = "08S02";
error_msg = "The active SQL connection has changed.";
Expand Down
Loading