diff --git a/.github/workflows/dockerized.yml b/.github/workflows/dockerized.yml index 8f8a8a1da..a046b959e 100644 --- a/.github/workflows/dockerized.yml +++ b/.github/workflows/dockerized.yml @@ -11,6 +11,7 @@ on: - '**/*.md' - '**/*.jpg' - '**/README.txt' + - '**/LICENSE.txt' - 'docs/**' - 'ISSUE_TEMPLATE/**' - '**/remove-old-artifacts.yml' @@ -179,7 +180,7 @@ jobs: echo "TEMP_AWS_ACCESS_KEY_ID=${creds[0]}" >> $GITHUB_ENV echo "TEMP_AWS_SECRET_ACCESS_KEY=${creds[1]}" >> $GITHUB_ENV echo "TEMP_AWS_SESSION_TOKEN=${creds[2]}" >> $GITHUB_ENV - + - name: 'Run Integration Tests' working-directory: ${{ github.workspace }}/testframework run: | diff --git a/driver/cluster_aware_metrics_container.cc b/driver/cluster_aware_metrics_container.cc index ce51ff6c9..7dc470f33 100644 --- a/driver/cluster_aware_metrics_container.cc +++ b/driver/cluster_aware_metrics_container.cc @@ -79,7 +79,7 @@ void CLUSTER_AWARE_METRICS_CONTAINER::set_gather_metric(bool gather) { this->can_gather = gather; } -void CLUSTER_AWARE_METRICS_CONTAINER::report_metrics(std::string conn_url, bool for_instances, FILE* log, unsigned long dbc_id) { +void CLUSTER_AWARE_METRICS_CONTAINER::report_metrics(std::string conn_url, bool for_instances, std::shared_ptr log, unsigned long dbc_id) { if (!log) { return; } diff --git a/driver/cluster_aware_metrics_container.h b/driver/cluster_aware_metrics_container.h index 72086afcb..405df8e3b 100644 --- a/driver/cluster_aware_metrics_container.h +++ b/driver/cluster_aware_metrics_container.h @@ -60,7 +60,7 @@ class CLUSTER_AWARE_METRICS_CONTAINER { void set_gather_metric(bool gather); - static void report_metrics(std::string conn_url, bool for_instances, FILE* log, unsigned long dbc_id); + static void report_metrics(std::string conn_url, bool for_instances, std::shared_ptr log, unsigned long dbc_id); static std::string report_metrics(std::string conn_url, bool for_instances); static void reset_metrics(); diff --git a/driver/connect.cc b/driver/connect.cc index cd2b6f76b..88d936d8c 100644 --- a/driver/connect.cc +++ b/driver/connect.cc @@ -285,7 +285,7 @@ std::shared_ptr get_host_info_from_ds(DataSource* ds) { } catch (std::string &) { err << "Invalid server '" << ds->server8 << "'."; if (ds->save_queries) { - MYLOG_TRACE(init_log_file().get(), 0, err.str().c_str()); + MYLOG_TRACE(init_log_file(), 0, err.str().c_str()); } throw std::runtime_error(err.str()); } @@ -293,7 +293,7 @@ std::shared_ptr get_host_info_from_ds(DataSource* ds) { if (hosts.size() == 0) { err << "No host was retrieved from the data source."; if (ds->save_queries) { - MYLOG_TRACE(init_log_file().get(), 0, err.str().c_str()); + MYLOG_TRACE(init_log_file(), 0, err.str().c_str()); } throw std::runtime_error(err.str()); } @@ -1535,9 +1535,7 @@ SQLRETURN SQL_API SQLDisconnect(SQLHDBC hdbc) cluster_id_str.append(":").append(std::to_string(dbc->connection_proxy->get_port())); } - CLUSTER_AWARE_METRICS_CONTAINER::report_metrics( - cluster_id_str, dbc->ds->gather_metrics_per_instance, - dbc->log_file ? dbc->log_file.get() : nullptr, dbc->id); + CLUSTER_AWARE_METRICS_CONTAINER::report_metrics(cluster_id_str, dbc->ds->gather_metrics_per_instance, dbc->log_file, dbc->id); } CHECK_HANDLE(hdbc); diff --git a/driver/connection_handler.cc b/driver/connection_handler.cc index d934b9f0c..b2eac55d0 100644 --- a/driver/connection_handler.cc +++ b/driver/connection_handler.cc @@ -60,7 +60,7 @@ SQLRETURN CONNECTION_HANDLER::do_connect(DBC* dbc_ptr, DataSource* ds, bool fail return dbc_ptr->connect(ds, failover_enabled); } -CONNECTION_PROXY* CONNECTION_HANDLER::connect(const std::shared_ptr& host_info, DataSource* ds) { +CONNECTION_PROXY* CONNECTION_HANDLER::connect(std::shared_ptr host_info, DataSource* ds) { if (dbc == nullptr || host_info == nullptr) { return nullptr; diff --git a/driver/connection_handler.h b/driver/connection_handler.h index d5423527a..1753c95c0 100644 --- a/driver/connection_handler.h +++ b/driver/connection_handler.h @@ -52,7 +52,7 @@ class CONNECTION_HANDLER { virtual ~CONNECTION_HANDLER(); virtual SQLRETURN do_connect(DBC* dbc_ptr, DataSource* ds, bool failover_enabled); - virtual CONNECTION_PROXY* connect(const std::shared_ptr& host_info, DataSource* ds); + virtual CONNECTION_PROXY* connect(std::shared_ptr host_info, DataSource* ds); void update_connection(CONNECTION_PROXY* new_connection, const std::string& new_host_name); private: diff --git a/driver/driver.h b/driver/driver.h index d0aa2d9c9..d9f2f6083 100644 --- a/driver/driver.h +++ b/driver/driver.h @@ -37,6 +37,7 @@ #define __DRIVER_H__ #include +#include #include "../MYODBC_MYSQL.h" #include "../MYODBC_CONF.h" @@ -593,6 +594,7 @@ struct ENV std::list conn_list; MYERROR error; std::mutex lock; + ctpl::thread_pool failover_thread_pool; ENV(SQLINTEGER ver) : odbc_ver(ver) {} diff --git a/driver/failover.h b/driver/failover.h index 82c45cd54..248ed1084 100644 --- a/driver/failover.h +++ b/driver/failover.h @@ -72,26 +72,27 @@ class FAILOVER_READER_HANDLER { FAILOVER_READER_HANDLER( std::shared_ptr topology_service, std::shared_ptr connection_handler, + ctpl::thread_pool& thread_pool, int failover_timeout_ms, int failover_reader_connect_timeout, bool enable_strict_reader_failover, unsigned long dbc_id, bool enable_logging = false); ~FAILOVER_READER_HANDLER(); - READER_FAILOVER_RESULT failover( + std::shared_ptr failover( std::shared_ptr topology_info); - virtual READER_FAILOVER_RESULT get_reader_connection( + virtual std::shared_ptr get_reader_connection( std::shared_ptr topology_info, - FAILOVER_SYNC& f_sync); + std::shared_ptr f_sync); std::vector> build_hosts_list( - const std::shared_ptr& topology_info, + std::shared_ptr topology_info, bool contain_writers); - READER_FAILOVER_RESULT get_connection_from_hosts( + std::shared_ptr get_connection_from_hosts( std::vector> hosts_list, - FAILOVER_SYNC& global_sync); + std::shared_ptr global_sync); protected: int reader_connect_timeout_ms = 30000; // 30 sec @@ -104,6 +105,7 @@ class FAILOVER_READER_HANDLER { bool enable_strict_reader_failover = false; std::shared_ptr logger = nullptr; unsigned long dbc_id = 0; + ctpl::thread_pool& thread_pool; }; // This struct holds results of Writer Failover Process. @@ -135,10 +137,11 @@ class FAILOVER_WRITER_HANDLER { std::shared_ptr topology_service, std::shared_ptr reader_handler, std::shared_ptr connection_handler, + ctpl::thread_pool& thread_pool, int writer_failover_timeout_ms, int read_topology_interval_ms, int reconnect_writer_interval_ms, unsigned long dbc_id, bool enable_logging = false); ~FAILOVER_WRITER_HANDLER(); - WRITER_FAILOVER_RESULT failover( + std::shared_ptr failover( std::shared_ptr current_topology); protected: @@ -152,6 +155,7 @@ class FAILOVER_WRITER_HANDLER { std::shared_ptr reader_handler; std::shared_ptr logger = nullptr; unsigned long dbc_id = 0; + ctpl::thread_pool& thread_pool; }; class FAILOVER_HANDLER { @@ -222,7 +226,7 @@ class FAILOVER { bool is_writer_connected(); protected: - bool connect(const std::shared_ptr& host_info); + bool connect(std::shared_ptr host_info); void sleep(int miliseconds); void release_new_connection(); std::shared_ptr connection_handler; @@ -241,9 +245,10 @@ class CONNECT_TO_READER_HANDLER : public FAILOVER { ~CONNECT_TO_READER_HANDLER(); void operator()( + int id, std::shared_ptr reader, - FAILOVER_SYNC& f_sync, - READER_FAILOVER_RESULT& result); + std::shared_ptr f_sync, + std::shared_ptr result); }; class RECONNECT_TO_WRITER_HANDLER : public FAILOVER { @@ -255,16 +260,17 @@ class RECONNECT_TO_WRITER_HANDLER : public FAILOVER { ~RECONNECT_TO_WRITER_HANDLER(); void operator()( + int id, std::shared_ptr original_writer, - FAILOVER_SYNC& f_sync, - WRITER_FAILOVER_RESULT& result); + std::shared_ptr f_sync, + std::shared_ptr result); private: int reconnect_interval_ms; bool is_current_host_writer( - const std::shared_ptr& original_writer, - const std::shared_ptr& latest_topology); + std::shared_ptr original_writer, + std::shared_ptr latest_topology); }; class WAIT_NEW_WRITER_HANDLER : public FAILOVER { @@ -278,9 +284,10 @@ class WAIT_NEW_WRITER_HANDLER : public FAILOVER { ~WAIT_NEW_WRITER_HANDLER(); void operator()( + int id, std::shared_ptr original_writer, - FAILOVER_SYNC& f_sync, - WRITER_FAILOVER_RESULT& result); + std::shared_ptr f_sync, + std::shared_ptr result); private: // TODO - initialize in constructor and define constant for default value @@ -288,11 +295,11 @@ class WAIT_NEW_WRITER_HANDLER : public FAILOVER { std::shared_ptr reader_handler; std::shared_ptr current_topology; CONNECTION_PROXY* reader_connection = nullptr; // To retrieve latest topology - std::shared_ptr current_reader_host; + std::shared_ptr current_reader_host = nullptr; void refresh_topology_and_connect_to_new_writer( - std::shared_ptr original_writer, FAILOVER_SYNC& f_sync); - void connect_to_reader(FAILOVER_SYNC& f_sync); + std::shared_ptr original_writer, std::shared_ptr f_sync); + void connect_to_reader(std::shared_ptr f_sync); bool connect_to_writer(std::shared_ptr writer_candidate); void clean_up_reader_connection(); }; diff --git a/driver/failover_handler.cc b/driver/failover_handler.cc index 3e60b6240..dc6bf757c 100644 --- a/driver/failover_handler.cc +++ b/driver/failover_handler.cc @@ -80,12 +80,12 @@ FAILOVER_HANDLER::FAILOVER_HANDLER(DBC* dbc, DataSource* ds, this->connection_handler = connection_handler; this->failover_reader_handler = std::make_shared( - this->topology_service, this->connection_handler, ds->failover_timeout, + this->topology_service, this->connection_handler, dbc->env->failover_thread_pool, ds->failover_timeout, ds->failover_reader_connect_timeout, ds->enable_strict_reader_failover, dbc->id, ds->save_queries); this->failover_writer_handler = std::make_shared( this->topology_service, this->failover_reader_handler, - this->connection_handler, ds->failover_timeout, + this->connection_handler, dbc->env->failover_thread_pool, ds->failover_timeout, ds->failover_topology_refresh_rate, ds->failover_writer_reconnect_interval, dbc->id, ds->save_queries); this->metrics_container = metrics_container; @@ -128,7 +128,7 @@ SQLRETURN FAILOVER_HANDLER::init_cluster_info() { host_patterns = parse_host_list(hp_str.c_str(), port); } catch (std::string&) { err << "Invalid host pattern: '" << hp_str << "' - the value could not be parsed"; - MYLOG_TRACE(dbc->log_file.get(), dbc->id, err.str().c_str()); + MYLOG_TRACE(dbc->log_file, dbc->id, err.str().c_str()); throw std::runtime_error(err.str()); } @@ -388,6 +388,9 @@ SQLRETURN FAILOVER_HANDLER::create_connection_and_initialize_topology() { network_timeout != ds->write_timeout)) { rc = reconnect(true); } + if (is_failover_enabled()) { + this->dbc->env->failover_thread_pool.resize(current_topology->total_hosts()); + } } return rc; @@ -471,9 +474,9 @@ bool FAILOVER_HANDLER::failover_to_reader(const char*& new_error_code, const cha MYLOG_DBC_TRACE(dbc, "[FAILOVER_HANDLER] Starting reader failover procedure."); auto result = failover_reader_handler->failover(current_topology); - if (result.connected) { - current_host = result.new_host; - connection_handler->update_connection(result.new_connection, current_host->get_host()); + if (result->connected) { + current_host = result->new_host; + connection_handler->update_connection(result->new_connection, current_host->get_host()); new_error_code = "08S02"; error_msg = "The active SQL connection has changed."; MYLOG_DBC_TRACE(dbc, @@ -494,20 +497,20 @@ bool FAILOVER_HANDLER::failover_to_writer(const char*& new_error_code, const cha MYLOG_DBC_TRACE(dbc, "[FAILOVER_HANDLER] Starting writer failover procedure."); auto result = failover_writer_handler->failover(current_topology); - if (!result.connected) { + if (!result->connected) { MYLOG_DBC_TRACE(dbc, "[FAILOVER_HANDLER] Unable to establish SQL connection to writer node."); new_error_code = "08S01"; error_msg = "The active SQL connection was lost."; return false; } - if (result.is_new_host) { + if (result->is_new_host) { // connected to a new writer host; take it over - current_topology = result.new_topology; + current_topology = result->new_topology; current_host = current_topology->get_writer(); } connection_handler->update_connection( - result.new_connection, result.new_topology->get_writer()->get_host()); + result->new_connection, result->new_topology->get_writer()->get_host()); new_error_code = "08S02"; error_msg = "The active SQL connection has changed."; diff --git a/driver/failover_reader_handler.cc b/driver/failover_reader_handler.cc index fc6d73d5b..2ec2f5ddc 100644 --- a/driver/failover_reader_handler.cc +++ b/driver/failover_reader_handler.cc @@ -39,11 +39,13 @@ FAILOVER_READER_HANDLER::FAILOVER_READER_HANDLER( std::shared_ptr topology_service, std::shared_ptr connection_handler, + ctpl::thread_pool& thread_pool, int failover_timeout_ms, int failover_reader_connect_timeout, bool enable_strict_reader_failover, unsigned long dbc_id, bool enable_logging) : topology_service{topology_service}, connection_handler{connection_handler}, + thread_pool{thread_pool}, max_failover_timeout_ms{failover_timeout_ms}, reader_connect_timeout_ms{failover_reader_connect_timeout}, enable_strict_reader_failover{enable_strict_reader_failover}, @@ -58,65 +60,80 @@ FAILOVER_READER_HANDLER::~FAILOVER_READER_HANDLER() {} // Function called to start the Reader Failover process. // This process will generate a list of available hosts: First readers that are up, then readers marked as down, then writers. // If it goes through the list and does not succeed to connect, it tries again, endlessly. -READER_FAILOVER_RESULT FAILOVER_READER_HANDLER::failover( +std::shared_ptr FAILOVER_READER_HANDLER::failover( std::shared_ptr current_topology) { - - READER_FAILOVER_RESULT reader_result(false, nullptr, nullptr); + auto empty_result = std::make_shared(false, nullptr, nullptr); if (!current_topology || current_topology->total_hosts() == 0) { - return reader_result; + return empty_result; } - FAILOVER_SYNC global_sync(1); + const auto start = std::chrono::steady_clock::now(); + auto global_sync = std::make_shared(1); - auto reader_result_future = std::async(std::launch::async, [=, &global_sync, &reader_result]() { - std::vector> hosts_list; - while (!global_sync.is_completed()) { - hosts_list = build_hosts_list(current_topology, !enable_strict_reader_failover); - reader_result = get_connection_from_hosts(hosts_list, global_sync); - if (reader_result.connected) { - global_sync.mark_as_complete(true); - return; + if (thread_pool.n_idle() == 0) { + MYLOG_TRACE(logger, dbc_id, "[FAILOVER_READER_HANDLER] Resizing thread pool to %d", thread_pool.size() + 1); + thread_pool.resize(thread_pool.size() + 1); + } + auto reader_result_future = thread_pool.push([=](int id) { + while (!global_sync->is_completed()) { + auto hosts_list = build_hosts_list(current_topology, !enable_strict_reader_failover); + auto reader_result = get_connection_from_hosts(hosts_list, global_sync); + if (reader_result->connected) { + global_sync->mark_as_complete(true); + return reader_result; } // TODO Think of changes to the strategy if it went // through all the hosts and did not connect. std::this_thread::sleep_for(std::chrono::seconds(READER_CONNECT_INTERVAL_SEC)); } + return empty_result; }); - global_sync.wait_and_complete(max_failover_timeout_ms); + // Wait for task complete signal with specified timeout + global_sync->wait_and_complete(max_failover_timeout_ms); + // Constantly polling for results until timeout + while (true) { + if (reader_result_future.wait_for(std::chrono::seconds(0)) == std::future_status::ready) { + MYLOG_TRACE(logger, dbc_id, "[FAILOVER_READER_HANDLER] Reader failover finished."); + return reader_result_future.get(); + } - if (reader_result_future.wait_for(std::chrono::seconds(0)) == std::future_status::ready) { - reader_result_future.get(); + // No result it ready, update remaining timeout + const auto duration = std::chrono::duration_cast(std::chrono::steady_clock::now() - start); + const auto remaining_wait_ms = max_failover_timeout_ms - duration.count(); + if (remaining_wait_ms <= 0) { + // Reader failover timed out + MYLOG_TRACE(logger, dbc_id, "[FAILOVER_READER_HANDLER] Reader failover timed out. Failed to connect to the reader instance."); + return empty_result; + } } - - return reader_result; } // Function to connect to a reader host. Often used to query/update the topology. // If it goes through the list of readers and fails to connect, it tries again, endlessly. // This function only tries to connect to reader hosts. -READER_FAILOVER_RESULT FAILOVER_READER_HANDLER::get_reader_connection( +std::shared_ptr FAILOVER_READER_HANDLER::get_reader_connection( std::shared_ptr topology_info, - FAILOVER_SYNC& f_sync) { + std::shared_ptr f_sync) { // We build a list of all readers, up then down, without writers. auto hosts = build_hosts_list(topology_info, false); - while (!f_sync.is_completed()) { + while (!f_sync->is_completed()) { auto reader_result = get_connection_from_hosts(hosts, f_sync); // TODO Think of changes to the strategy if it went through all the readers and did not connect. - if (reader_result.connected) { + if (reader_result->connected) { return reader_result; } } // Return a false result if the connection request has been cancelled. - return READER_FAILOVER_RESULT(false, nullptr, nullptr); + return std::make_shared(false, nullptr, nullptr); } // Function that reads the topology and builds a list of hosts to connect to, in order of priority. // boolean include_writers indicate whether one wants to append the writers to the end of the list or not. std::vector> FAILOVER_READER_HANDLER::build_hosts_list( - const std::shared_ptr& topology_info, + std::shared_ptr topology_info, bool include_writers) { std::vector> hosts_list; @@ -153,8 +170,8 @@ std::vector> FAILOVER_READER_HANDLER::build_hosts_lis return hosts_list; } -READER_FAILOVER_RESULT FAILOVER_READER_HANDLER::get_connection_from_hosts( - std::vector> hosts_list, FAILOVER_SYNC& global_sync) { +std::shared_ptr FAILOVER_READER_HANDLER::get_connection_from_hosts( + std::vector> hosts_list, std::shared_ptr global_sync) { size_t total_hosts = hosts_list.size(); size_t i = 0; @@ -162,64 +179,88 @@ READER_FAILOVER_RESULT FAILOVER_READER_HANDLER::get_connection_from_hosts( // This loop should end once it reaches the end of the list without a successful connection. // The function calling it already has a neverending loop looking for a connection. // Ending this loop will allow the calling function to update the list or change strategy if this failed. - while (!global_sync.is_completed() && i < total_hosts) { + while (!global_sync->is_completed() && i < total_hosts) { + const auto start = std::chrono::steady_clock::now(); + // This boolean verifies if the next host in the list is also the last, meaning there's no host for the second thread. - bool odd_hosts_number = (i + 1 == total_hosts); + const bool odd_hosts_number = (i + 1 == total_hosts); - FAILOVER_SYNC local_sync(1); + auto local_sync = std::make_shared(1); if (!odd_hosts_number) { - local_sync.increment_task(); + local_sync->increment_task(); } CONNECT_TO_READER_HANDLER first_connection_handler(connection_handler, topology_service, dbc_id, logger != nullptr); - std::future first_connection_future; - READER_FAILOVER_RESULT first_connection_result(false, nullptr, nullptr); + auto first_connection_result = std::make_shared(false, nullptr, nullptr); CONNECT_TO_READER_HANDLER second_connection_handler(connection_handler, topology_service, dbc_id, logger != nullptr); - std::future second_connection_future; - READER_FAILOVER_RESULT second_connection_result(false, nullptr, nullptr); + auto second_connection_result = std::make_shared(false, nullptr, nullptr); std::shared_ptr first_reader_host = hosts_list.at(i); - first_connection_future = std::async(std::launch::async, std::ref(first_connection_handler), - std::ref(first_reader_host), std::ref(local_sync), - std::ref(first_connection_result)); + if (thread_pool.n_idle() <= 1) { + int size = thread_pool.size() + 2 - thread_pool.n_idle(); + MYLOG_TRACE(logger, dbc_id, + "[FAILOVER_READER_HANDLER] Resizing thread pool to %d", size); + thread_pool.resize(size); + } + + auto first_result = thread_pool.push(std::move(first_connection_handler), first_reader_host, local_sync, first_connection_result); + std::future second_future; if (!odd_hosts_number) { - std::shared_ptr second_reader_host = hosts_list.at(i + 1); - second_connection_future = std::async(std::launch::async, std::ref(second_connection_handler), - std::ref(second_reader_host), std::ref(local_sync), - std::ref(second_connection_result)); + auto second_reader_host = hosts_list.at(i + 1); + second_future = thread_pool.push(std::move(second_connection_handler), second_reader_host, local_sync, second_connection_result); } - local_sync.wait_and_complete(reader_connect_timeout_ms); + // Wait for task complete signal with specified timeout + local_sync->wait_and_complete(reader_connect_timeout_ms); - if (first_connection_future.wait_for(std::chrono::seconds(0)) == std::future_status::ready) { - first_connection_future.get(); - } - if (!odd_hosts_number && - second_connection_future.wait_for(std::chrono::seconds(0)) == std::future_status::ready) { + // Constantly polling for results until timeout + while (true) { + // Check if first reader task result is ready + if (first_result.valid() && first_result.wait_for(std::chrono::seconds(0)) == std::future_status::ready) { + first_result.get(); + if (first_connection_result->connected) { + MYLOG_TRACE(logger, dbc_id, + "[FAILOVER_READER_HANDLER] Connected to reader: %s", + first_connection_result->new_host->get_host_port_pair().c_str()); - second_connection_future.get(); - } + return first_connection_result; + } + } - if (first_connection_result.connected) { - MYLOG_TRACE(logger.get(), dbc_id, + // Check if second reader task result is ready if there is one + if (!odd_hosts_number && second_future.valid() && + second_future.wait_for(std::chrono::seconds(0)) == std::future_status::ready) { + second_future.get(); + if (second_connection_result->connected) { + MYLOG_TRACE(logger, dbc_id, "[FAILOVER_READER_HANDLER] Connected to reader: %s", - first_connection_result.new_host->get_host_port_pair().c_str()); - return first_connection_result; - } else if (!odd_hosts_number && second_connection_result.connected) { - MYLOG_TRACE(logger.get(), dbc_id, - "[FAILOVER_READER_HANDLER] Connected to reader: %s", - second_connection_result.new_host->get_host_port_pair().c_str()); - return second_connection_result; + second_connection_result->new_host->get_host_port_pair().c_str()); + + return second_connection_result; + } + } + + // Results are ready but non has valid connection + if (!first_result.valid() && (odd_hosts_number || !second_future.valid())) { + break; + } + + // No result it ready, update remaining timeout + const auto duration = std::chrono::duration_cast(std::chrono::steady_clock::now() - start); + const auto remaining_wait_ms = reader_connect_timeout_ms - duration.count(); + if (remaining_wait_ms <= 0) { + // None has connected. We move on and try new hosts. + std::this_thread::sleep_for(std::chrono::seconds(READER_CONNECT_INTERVAL_SEC)); + break; + } } - // None has connected. We move on and try new hosts. i += 2; - std::this_thread::sleep_for(std::chrono::seconds(READER_CONNECT_INTERVAL_SEC)); } // The operation was either cancelled either reached the end of the list without connecting. - return READER_FAILOVER_RESULT(false, nullptr, nullptr); + return std::make_shared(false, nullptr, nullptr); } // *** CONNECT_TO_READER_HANDLER @@ -233,38 +274,41 @@ CONNECT_TO_READER_HANDLER::CONNECT_TO_READER_HANDLER( CONNECT_TO_READER_HANDLER::~CONNECT_TO_READER_HANDLER() {} void CONNECT_TO_READER_HANDLER::operator()( + int id, std::shared_ptr reader, - FAILOVER_SYNC& f_sync, - READER_FAILOVER_RESULT& result) { + std::shared_ptr f_sync, + std::shared_ptr result) { - if (reader && !f_sync.is_completed()) { + if (reader && !f_sync->is_completed()) { - MYLOG_TRACE(logger.get(), dbc_id, - "[CONNECT_TO_READER_HANDLER] Trying to connect to reader: %s", - reader->get_host_port_pair().c_str()); + MYLOG_TRACE(logger, dbc_id, + "Thread ID %d - [CONNECT_TO_READER_HANDLER] Trying to connect to reader: %s", + id, reader->get_host_port_pair().c_str()); if (connect(reader)) { topology_service->mark_host_up(reader); - if (f_sync.is_completed()) { + if (f_sync->is_completed()) { // If another thread finishes first, or both timeout, this thread is canceled. release_new_connection(); } else { - result = READER_FAILOVER_RESULT(true, reader, std::move(this->new_connection)); - f_sync.mark_as_complete(true); + result->connected =true; + result->new_host = reader; + result->new_connection = std::move(this->new_connection); + f_sync->mark_as_complete(true); MYLOG_TRACE( - logger.get(), dbc_id, - "[CONNECT_TO_READER_HANDLER] Connected to reader: %s", - reader->get_host_port_pair().c_str()); + logger, dbc_id, + "Thread ID %d - [CONNECT_TO_READER_HANDLER] Connected to reader: %s", + id, reader->get_host_port_pair().c_str()); return; } } else { topology_service->mark_host_down(reader); MYLOG_TRACE( - logger.get(), dbc_id, - "[CONNECT_TO_READER_HANDLER] Failed to connect to reader: %s", - reader->get_host_port_pair().c_str()); - if (!f_sync.is_completed()) { - f_sync.mark_as_complete(false); + logger, dbc_id, + "Thread ID %d - [CONNECT_TO_READER_HANDLER] Failed to connect to reader: %s", + id, reader->get_host_port_pair().c_str()); + if (!f_sync->is_completed()) { + f_sync->mark_as_complete(false); } } } diff --git a/driver/failover_writer_handler.cc b/driver/failover_writer_handler.cc index 0baa81e24..23d83d56d 100644 --- a/driver/failover_writer_handler.cc +++ b/driver/failover_writer_handler.cc @@ -86,7 +86,7 @@ bool FAILOVER::is_writer_connected() { return new_connection && new_connection->is_connected(); } -bool FAILOVER::connect(const std::shared_ptr& host_info) { +bool FAILOVER::connect(std::shared_ptr host_info) { new_connection = connection_handler->connect(host_info, nullptr); return is_writer_connected(); } @@ -116,17 +116,18 @@ RECONNECT_TO_WRITER_HANDLER::RECONNECT_TO_WRITER_HANDLER( RECONNECT_TO_WRITER_HANDLER::~RECONNECT_TO_WRITER_HANDLER() {} void RECONNECT_TO_WRITER_HANDLER::operator()( + int id, std::shared_ptr original_writer, - FAILOVER_SYNC& f_sync, - WRITER_FAILOVER_RESULT& result) { + std::shared_ptr f_sync, + std::shared_ptr result) { if (original_writer) { - MYLOG_TRACE(logger.get(), dbc_id, - "[RECONNECT_TO_WRITER_HANDLER] [TaskA] Attempting to " + MYLOG_TRACE(logger, dbc_id, + "Thread ID %d - [RECONNECT_TO_WRITER_HANDLER] [TaskA] Attempting to " "re-connect to the current writer instance: %s", - original_writer->get_host_port_pair().c_str()); + id, original_writer->get_host_port_pair().c_str()); - while (!f_sync.is_completed()) { + while (!f_sync->is_completed()) { if (connect(original_writer)) { auto latest_topology = topology_service->get_topology(new_connection, true); @@ -134,29 +135,31 @@ void RECONNECT_TO_WRITER_HANDLER::operator()( is_current_host_writer(original_writer, latest_topology)) { topology_service->mark_host_up(original_writer); - if (f_sync.is_completed()) { + if (f_sync->is_completed()) { break; } - result = WRITER_FAILOVER_RESULT(true, false, latest_topology, - std::move(new_connection)); - f_sync.mark_as_complete(true); - MYLOG_TRACE(logger.get(), dbc_id, "[RECONNECT_TO_WRITER_HANDLER] [TaskA] Finished"); + result->connected = true; + result->is_new_host = false; + result->new_topology = latest_topology; + result->new_connection = std::move(new_connection); + f_sync->mark_as_complete(true); + MYLOG_TRACE(logger, dbc_id, "Thread ID %d - [RECONNECT_TO_WRITER_HANDLER] [TaskA] Finished", id); return; } release_new_connection(); } sleep(reconnect_interval_ms); } - MYLOG_TRACE(logger.get(), dbc_id, "[RECONNECT_TO_WRITER_HANDLER] [TaskA] Cancelled"); + MYLOG_TRACE(logger, dbc_id, "Thread ID %d - [RECONNECT_TO_WRITER_HANDLER] [TaskA] Cancelled", id); } // Another thread finishes or both timeout, this thread is canceled release_new_connection(); - MYLOG_TRACE(logger.get(), dbc_id, "[RECONNECT_TO_WRITER_HANDLER] [TaskA] Finished"); + MYLOG_TRACE(logger, dbc_id, "Thread ID %d - [RECONNECT_TO_WRITER_HANDLER] [TaskA] Finished", id); } bool RECONNECT_TO_WRITER_HANDLER::is_current_host_writer( - const std::shared_ptr& original_writer, - const std::shared_ptr& latest_topology) { + std::shared_ptr original_writer, + std::shared_ptr latest_topology) { auto original_instance = original_writer->instance_name; if (original_instance.empty()) return false; auto latest_writer = latest_topology->get_writer(); @@ -181,54 +184,57 @@ WAIT_NEW_WRITER_HANDLER::WAIT_NEW_WRITER_HANDLER( WAIT_NEW_WRITER_HANDLER::~WAIT_NEW_WRITER_HANDLER() {} void WAIT_NEW_WRITER_HANDLER::operator()( + int id, std::shared_ptr original_writer, - FAILOVER_SYNC& f_sync, - WRITER_FAILOVER_RESULT& result) { + std::shared_ptr f_sync, + std::shared_ptr result) { - MYLOG_TRACE(logger.get(), dbc_id, "[WAIT_NEW_WRITER_HANDLER] [TaskB] Attempting to connect to a new writer instance"); + MYLOG_TRACE(logger, dbc_id, "Thread ID %d - [WAIT_NEW_WRITER_HANDLER] [TaskB] Attempting to connect to a new writer instance", id); - while (!f_sync.is_completed()) { + while (!f_sync->is_completed()) { if (!is_writer_connected()) { connect_to_reader(f_sync); refresh_topology_and_connect_to_new_writer(original_writer, f_sync); clean_up_reader_connection(); } else { - result = WRITER_FAILOVER_RESULT(true, true, current_topology, - std::move(new_connection)); - f_sync.mark_as_complete(true); - MYLOG_TRACE(logger.get(), dbc_id, "[WAIT_NEW_WRITER_HANDLER] [TaskB] Finished"); + result->connected = true; + result->is_new_host = true; + result->new_topology = current_topology; + result->new_connection = std::move(new_connection); + f_sync->mark_as_complete(true); + MYLOG_TRACE(logger, dbc_id, "Thread ID %d - [WAIT_NEW_WRITER_HANDLER] [TaskB] Finished", id); return; } } - MYLOG_TRACE(logger.get(), dbc_id, "[WAIT_NEW_WRITER_HANDLER] [TaskB] Cancelled"); + MYLOG_TRACE(logger, dbc_id, "Thread ID %d - [WAIT_NEW_WRITER_HANDLER] [TaskB] Cancelled", id); // Another thread finishes or both timeout, this thread is canceled clean_up_reader_connection(); release_new_connection(); - MYLOG_TRACE(logger.get(), dbc_id, "[WAIT_NEW_WRITER_HANDLER] [TaskB] Finished"); + MYLOG_TRACE(logger, dbc_id, "Thread ID %d - [WAIT_NEW_WRITER_HANDLER] [TaskB] Finished", id); } // Connect to a reader to later retrieve the latest topology -void WAIT_NEW_WRITER_HANDLER::connect_to_reader(FAILOVER_SYNC& f_sync) { - while (!f_sync.is_completed()) { +void WAIT_NEW_WRITER_HANDLER::connect_to_reader(std::shared_ptr f_sync) { + while (!f_sync->is_completed()) { auto connection_result = reader_handler->get_reader_connection(current_topology, f_sync); - if (connection_result.connected && connection_result.new_connection->is_connected()) { - reader_connection = connection_result.new_connection; - current_reader_host = connection_result.new_host; + if (connection_result->connected && connection_result->new_connection->is_connected()) { + reader_connection = connection_result->new_connection; + current_reader_host = connection_result->new_host; MYLOG_TRACE( - logger.get(), dbc_id, + logger, dbc_id, "[WAIT_NEW_WRITER_HANDLER] [TaskB] Connected to reader: %s", - connection_result.new_host->get_host_port_pair().c_str()); + connection_result->new_host->get_host_port_pair().c_str()); break; } - MYLOG_TRACE(logger.get(), dbc_id, "[WAIT_NEW_WRITER_HANDLER] [TaskB] Failed to connect to any reader."); + MYLOG_TRACE(logger, dbc_id, "[WAIT_NEW_WRITER_HANDLER] [TaskB] Failed to connect to any reader."); } } // Use just connected reader to refresh topology and try to connect to a new writer void WAIT_NEW_WRITER_HANDLER::refresh_topology_and_connect_to_new_writer( - std::shared_ptr original_writer, FAILOVER_SYNC& f_sync) { - while (!f_sync.is_completed()) { + std::shared_ptr original_writer, std::shared_ptr f_sync) { + while (!f_sync->is_completed()) { auto latest_topology = topology_service->get_topology(reader_connection, true); if (latest_topology->total_hosts() > 0) { current_topology = latest_topology; @@ -246,7 +252,7 @@ void WAIT_NEW_WRITER_HANDLER::refresh_topology_and_connect_to_new_writer( bool WAIT_NEW_WRITER_HANDLER::connect_to_writer( std::shared_ptr writer_candidate) { - MYLOG_TRACE(logger.get(), dbc_id, + MYLOG_TRACE(logger, dbc_id, "[WAIT_NEW_WRITER_HANDLER] [TaskB] Trying to connect to a new writer: %s", writer_candidate->get_host_port_pair().c_str()); @@ -275,11 +281,13 @@ FAILOVER_WRITER_HANDLER::FAILOVER_WRITER_HANDLER( std::shared_ptr topology_service, std::shared_ptr reader_handler, std::shared_ptr connection_handler, + ctpl::thread_pool& thread_pool, int writer_failover_timeout_ms, int read_topology_interval_ms, int reconnect_writer_interval_ms, unsigned long dbc_id, bool enable_logging) : connection_handler{connection_handler}, topology_service{topology_service}, reader_handler{reader_handler}, + thread_pool{thread_pool}, writer_failover_timeout_ms{writer_failover_timeout_ms}, read_topology_interval_ms{read_topology_interval_ms}, reconnect_writer_interval_ms{reconnect_writer_interval_ms}, @@ -291,17 +299,20 @@ FAILOVER_WRITER_HANDLER::FAILOVER_WRITER_HANDLER( FAILOVER_WRITER_HANDLER::~FAILOVER_WRITER_HANDLER() {} -WRITER_FAILOVER_RESULT FAILOVER_WRITER_HANDLER::failover( +std::shared_ptr FAILOVER_WRITER_HANDLER::failover( std::shared_ptr current_topology) { if (!current_topology || current_topology->total_hosts() == 0) { - MYLOG_TRACE(logger.get(), dbc_id, + MYLOG_TRACE(logger, dbc_id, "[FAILOVER_WRITER_HANDLER] Failover was called with " "an invalid (null or empty) topology"); - return WRITER_FAILOVER_RESULT(false, false, nullptr, nullptr); + return std::make_shared(false, false, nullptr, nullptr); } - FAILOVER_SYNC failover_sync(2); + const auto start = std::chrono::steady_clock::now(); + + auto failover_sync = std::make_shared(2); + // Constructing the function objects RECONNECT_TO_WRITER_HANDLER reconnect_handler( connection_handler, topology_service, reconnect_writer_interval_ms, dbc_id, logger != nullptr); @@ -312,42 +323,62 @@ WRITER_FAILOVER_RESULT FAILOVER_WRITER_HANDLER::failover( auto original_writer = current_topology->get_writer(); topology_service->mark_host_down(original_writer); - auto reconnect_result = WRITER_FAILOVER_RESULT(false, false, nullptr, nullptr); - auto new_writer_result = WRITER_FAILOVER_RESULT(false, false, nullptr, nullptr); - - // Try reconnecting to the original writer host - auto reconnect_future = std::async(std::launch::async, std::ref(reconnect_handler), - original_writer, std::ref(failover_sync), - std::ref(reconnect_result)); - - // Concurrently see if topology has changed and try connecting to a new writer - auto new_writer_future = std::async(std::launch::async, std::ref(new_writer_handler), - std::cref(original_writer), std::ref(failover_sync), - std::ref(new_writer_result)); + auto reconnect_result = std::make_shared(false, false, nullptr, nullptr); + auto new_writer_result = std::make_shared(false, false, nullptr, nullptr); - failover_sync.wait_and_complete(writer_failover_timeout_ms); - - if (reconnect_future.wait_for(std::chrono::seconds(0)) == std::future_status::ready) { - reconnect_future.get(); + if (thread_pool.n_idle() <= 1) { + int size = thread_pool.size() + 2 - thread_pool.n_idle(); + MYLOG_TRACE(logger, dbc_id, + "[FAILOVER_WRITER_HANDLER] Resizing thread pool to %d", size); + thread_pool.resize(size); } - if (new_writer_future.wait_for(std::chrono::seconds(0)) == std::future_status::ready) { - new_writer_future.get(); - } + auto reconnect_future = thread_pool.push(std::move(reconnect_handler), original_writer, failover_sync, reconnect_result); + auto wait_new_writer_future = thread_pool.push(std::move(new_writer_handler), original_writer, failover_sync, new_writer_result); - if (reconnect_result.connected) { - MYLOG_TRACE(logger.get(), dbc_id, + // Wait for task complete signal with specified timeout + failover_sync->wait_and_complete(writer_failover_timeout_ms); + + // Constantly polling for results until timeout + while (true) { + // Check if reconnect task result is ready + if (reconnect_future.valid() && reconnect_future.wait_for(std::chrono::seconds(0)) == std::future_status::ready) { + reconnect_future.get(); + if (reconnect_result->connected) { + MYLOG_TRACE(logger, dbc_id, "[FAILOVER_WRITER_HANDLER] Successfully re-connected to the current writer instance: %s", - reconnect_result.new_topology->get_writer()->get_host_port_pair().c_str()); - return reconnect_result; - } else if (new_writer_result.connected) { - MYLOG_TRACE(logger.get(), dbc_id, + reconnect_result->new_topology->get_writer()->get_host_port_pair().c_str()); + return reconnect_result; + } + } + + // Check if wait new writer task result is ready + if (wait_new_writer_future.valid() && wait_new_writer_future.wait_for(std::chrono::seconds(0)) == std::future_status::ready) { + wait_new_writer_future.get(); + if (new_writer_result->connected) { + MYLOG_TRACE(logger, dbc_id, "[FAILOVER_WRITER_HANDLER] Successfully connected to the new writer instance: %s", - new_writer_result.new_topology->get_writer()->get_host_port_pair().c_str()); - return new_writer_result; + new_writer_result->new_topology->get_writer()->get_host_port_pair().c_str()); + return new_writer_result; + } + } + + // Results are ready but non has valid connection + if (!reconnect_future.valid() && !wait_new_writer_future.valid()) { + break; + } + + // No result it ready, update remaining timeout + const auto duration = std::chrono::duration_cast(std::chrono::steady_clock::now() - start); + const auto remaining_wait_ms = writer_failover_timeout_ms - duration.count(); + if (remaining_wait_ms <= 0) { + // Writer failover timed out + MYLOG_TRACE(logger, dbc_id, "[FAILOVER_WRITER_HANDLER] Writer failover timed out. Failed to connect to the writer instance."); + return std::make_shared(false, false, nullptr, nullptr); + } } - // timeout - MYLOG_TRACE(logger.get(), dbc_id, "[FAILOVER_WRITER_HANDLER] Failed to connect to the writer instance."); - return WRITER_FAILOVER_RESULT(false, false, nullptr, nullptr); + // Writer failover finished but not connected + MYLOG_TRACE(logger, dbc_id, "[FAILOVER_WRITER_HANDLER] Failed to connect to the writer instance."); + return std::make_shared(false, false, nullptr, nullptr); } diff --git a/driver/host_info.cc b/driver/host_info.cc index 05ef63488..ba9e3beba 100644 --- a/driver/host_info.cc +++ b/driver/host_info.cc @@ -113,6 +113,6 @@ void HOST_INFO::mark_as_writer(bool writer) { } // Check if two host info have same instance name -bool HOST_INFO::is_host_same(const std::shared_ptr& h1, const std::shared_ptr& h2) { +bool HOST_INFO::is_host_same(std::shared_ptr h1, std::shared_ptr h2) { return h1->instance_name == h2->instance_name; } diff --git a/driver/host_info.h b/driver/host_info.h index d9d442edd..e5c64a420 100644 --- a/driver/host_info.h +++ b/driver/host_info.h @@ -57,7 +57,7 @@ class HOST_INFO { bool is_host_down(); bool is_host_writer(); void mark_as_writer(bool writer); - static bool is_host_same(const std::shared_ptr& h1, const std::shared_ptr& h2); + static bool is_host_same(std::shared_ptr h1, std::shared_ptr h2); static constexpr int NO_PORT = -1; // used to be properties - TODO - remove the ones that are not necessary diff --git a/driver/monitor.cc b/driver/monitor.cc index eac8cef25..2f261aa86 100644 --- a/driver/monitor.cc +++ b/driver/monitor.cc @@ -79,6 +79,7 @@ MONITOR::~MONITOR() { } if (this->connection_proxy) { + this->connection_proxy->delete_ds(); delete this->connection_proxy; this->connection_proxy = nullptr; } @@ -103,7 +104,7 @@ void MONITOR::start_monitoring(std::shared_ptr conte void MONITOR::stop_monitoring(std::shared_ptr context) { if (context == nullptr) { MYLOG_TRACE( - this->logger.get(), 0, + this->logger, 0, "[MONITOR] Invalid context passed into stop_monitoring()"); return; } diff --git a/driver/monitor_connection_context.cc b/driver/monitor_connection_context.cc index 1fe5e8aeb..ae846d828 100644 --- a/driver/monitor_connection_context.cc +++ b/driver/monitor_connection_context.cc @@ -165,14 +165,14 @@ void MONITOR_CONNECTION_CONTEXT::set_connection_valid( const auto max_invalid_node_duration = get_failure_detection_interval() * (std::max)(0, get_failure_detection_count()); if (invalid_node_duration_ms >= max_invalid_node_duration) { - MYLOG_TRACE(logger.get(), get_dbc_id(), "[MONITOR_CONNECTION_CONTEXT] Node '%s' is *dead*.", node_keys_str.c_str()); + MYLOG_TRACE(logger, get_dbc_id(), "[MONITOR_CONNECTION_CONTEXT] Node '%s' is *dead*.", node_keys_str.c_str()); set_node_unhealthy(true); abort_connection(); return; } MYLOG_TRACE( - logger.get(), get_dbc_id(), + logger, get_dbc_id(), "[MONITOR_CONNECTION_CONTEXT] Node '%s' is *not responding* (%d).", node_keys_str.c_str(), get_failure_count()); return; } @@ -180,7 +180,7 @@ void MONITOR_CONNECTION_CONTEXT::set_connection_valid( set_failure_count(0); reset_invalid_node_start_time(); set_node_unhealthy(false); - MYLOG_TRACE(logger.get(), get_dbc_id(), "[MONITOR_CONNECTION_CONTEXT] Node '%s' is *alive*.", node_keys_str.c_str()); + MYLOG_TRACE(logger, get_dbc_id(), "[MONITOR_CONNECTION_CONTEXT] Node '%s' is *alive*.", node_keys_str.c_str()); } void MONITOR_CONNECTION_CONTEXT::abort_connection() { diff --git a/driver/monitor_service.cc b/driver/monitor_service.cc index bf2e0a2e4..0643710a9 100644 --- a/driver/monitor_service.cc +++ b/driver/monitor_service.cc @@ -58,13 +58,13 @@ std::shared_ptr MONITOR_SERVICE::start_monitoring( if (!dbc || !ds) { auto msg = "[MONITOR_SERVICE] Parameter dbc or ds cannot be null"; - MYLOG_TRACE(this->logger.get(), dbc ? dbc->id : 0, msg); + MYLOG_TRACE(this->logger, dbc ? dbc->id : 0, msg); throw std::invalid_argument(msg); } if (node_keys.empty()) { auto msg = "[MONITOR_SERVICE] Parameter node_keys cannot be empty"; - MYLOG_TRACE(this->logger.get(), dbc ? dbc->id : 0, msg); + MYLOG_TRACE(this->logger, dbc ? dbc->id : 0, msg); throw std::invalid_argument(msg); } @@ -96,7 +96,7 @@ std::shared_ptr MONITOR_SERVICE::start_monitoring( void MONITOR_SERVICE::stop_monitoring(std::shared_ptr context) { if (context == nullptr) { MYLOG_TRACE( - this->logger.get(), 0, + this->logger, 0, "[MONITOR_SERVICE] Invalid context passed into stop_monitoring()"); return; } @@ -106,7 +106,7 @@ void MONITOR_SERVICE::stop_monitoring(std::shared_ptrthread_container->get_node(context->get_node_keys()); if (node.empty()) { MYLOG_TRACE( - this->logger.get(), context->get_dbc_id(), + this->logger, context->get_dbc_id(), "[MONITOR_SERVICE] Can not find node key from context"); return; } @@ -121,7 +121,7 @@ void MONITOR_SERVICE::stop_monitoring_for_all_connections(std::set std::string node = this->thread_container->get_node(node_keys); if (node.empty()) { MYLOG_TRACE( - this->logger.get(), 0, + this->logger, 0, "[MONITOR_SERVICE] Invalid node keys passed into stop_monitoring_for_all_connections(). " "No existing monitor for the given set of node keys"); return; @@ -137,7 +137,7 @@ void MONITOR_SERVICE::stop_monitoring_for_all_connections(std::set void MONITOR_SERVICE::notify_unused(const std::shared_ptr& monitor) const { if (monitor == nullptr) { MYLOG_TRACE( - this->logger.get(), 0, + this->logger, 0, "[MONITOR_SERVICE] Invalid monitor passed into notify_unused()"); return; } diff --git a/driver/mylog.cc b/driver/mylog.cc index 54c22b73c..fc9cbb38c 100644 --- a/driver/mylog.cc +++ b/driver/mylog.cc @@ -41,7 +41,10 @@ #include #endif -void trace_print(FILE *file, unsigned long dbc_id, const char *fmt, ...) { +std::mutex log_file_mutex; +std::shared_ptr log_file; + +void trace_print(std::shared_ptr file, unsigned long dbc_id, const char *fmt, ...) { if (file && fmt) { time_t now = time(nullptr); char time_buf[256]; @@ -63,10 +66,12 @@ void trace_print(FILE *file, unsigned long dbc_id, const char *fmt, ...) { pid_t pid; pid = getpid(); #endif - - fprintf(file, "%s - Process ID %ld - DBC ID %lu - %s\n", time_buf, pid, - dbc_id, buf.data()); - fflush(file); + { + std::lock_guard guard(log_file_mutex); + fprintf(file.get(), "%s - Process ID %ld - DBC ID %lu - %s\n", time_buf, pid, + dbc_id, buf.data()); + fflush(file.get()); + } } } diff --git a/driver/mylog.h b/driver/mylog.h index b352e7724..e496738f2 100644 --- a/driver/mylog.h +++ b/driver/mylog.h @@ -38,11 +38,11 @@ #define MYLOG_STMT_TRACE(A, B) \ { \ if ((A)->dbc->ds->save_queries) \ - trace_print((A)->dbc->log_file.get(), (A)->dbc->id, (const char *)B); \ + trace_print((A)->dbc->log_file, (A)->dbc->id, (const char *)B); \ } #define MYLOG_DBC_TRACE(A, ...) \ - { trace_print((A)->log_file.get(), (A)->id, __VA_ARGS__); } + { trace_print((A)->log_file, (A)->id, __VA_ARGS__); } #define MYLOG_TRACE(A, B, ...) \ { \ @@ -59,12 +59,12 @@ struct FILEDeleter { } }; -static std::shared_ptr log_file; -static std::mutex log_file_mutex; +extern std::shared_ptr log_file; +extern std::mutex log_file_mutex; /* Functions used when debugging */ std::shared_ptr init_log_file(); void end_log_file(); -void trace_print(FILE *file, unsigned long dbc_id, const char *fmt, ...); +void trace_print(std::shared_ptr file, unsigned long dbc_id, const char *fmt, ...); #endif /* __MYLOG_H__ */ diff --git a/driver/topology_service.cc b/driver/topology_service.cc index a92632d3e..4ef132b8e 100644 --- a/driver/topology_service.cc +++ b/driver/topology_service.cc @@ -58,7 +58,7 @@ TOPOLOGY_SERVICE::~TOPOLOGY_SERVICE() { } void TOPOLOGY_SERVICE::set_cluster_id(std::string cid) { - MYLOG_TRACE(logger.get(), dbc_id, "[TOPOLOGY_SERVICE] cluster ID=%s", cid.c_str()); + MYLOG_TRACE(logger, dbc_id, "[TOPOLOGY_SERVICE] cluster ID=%s", cid.c_str()); this->cluster_id = cid; metrics_container->set_cluster_id(this->cluster_id); } @@ -71,7 +71,7 @@ void TOPOLOGY_SERVICE::set_cluster_instance_template(std::shared_ptr if (cluster_instance_host) cluster_instance_host.reset(); - MYLOG_TRACE(logger.get(), dbc_id, + MYLOG_TRACE(logger, dbc_id, "[TOPOLOGY_SERVICE] cluster instance host=%s, port=%d", host_template->get_host().c_str(), host_template->get_port()); cluster_instance_host = host_template; @@ -300,7 +300,7 @@ std::shared_ptr TOPOLOGY_SERVICE::query_for_topology(CONN topology_info->is_multi_writer_cluster = writer_count > 1; if (writer_count == 0) { - MYLOG_TRACE(logger.get(), dbc_id, + MYLOG_TRACE(logger, dbc_id, "[TOPOLOGY_SERVICE] The topology query returned an " "invalid topology - no writer instance detected"); } diff --git a/integration/CMakeLists.txt b/integration/CMakeLists.txt index 83e0d4b68..fe716112a 100644 --- a/integration/CMakeLists.txt +++ b/integration/CMakeLists.txt @@ -102,7 +102,8 @@ set(INTEGRATION_TESTS iam_authentication_integration_test.cc secrets_manager_integration_test.cc network_failover_integration_test.cc - failover_integration_test.cc) + failover_integration_test.cc + ) if(NOT ENABLE_PERFORMANCE_TESTS) set(TEST_SOURCES ${TEST_SOURCES} ${INTEGRATION_TESTS}) diff --git a/integration/failover_integration_test.cc b/integration/failover_integration_test.cc index 20fb3be0d..246eb90fc 100644 --- a/integration/failover_integration_test.cc +++ b/integration/failover_integration_test.cc @@ -37,7 +37,7 @@ class FailoverIntegrationTest : public BaseFailoverIntegrationTest { Aws::Auth::AWSCredentials credentials = Aws::Auth::AWSCredentials(Aws::String(ACCESS_KEY), Aws::String(SECRET_ACCESS_KEY), Aws::String(SESSION_TOKEN)); - Aws::Client::ClientConfiguration client_config; + Aws::RDS::RDSClientConfiguration client_config; Aws::RDS::RDSClient rds_client; SQLHENV env = nullptr; SQLHDBC dbc = nullptr; diff --git a/integration/failover_performance_test.cc b/integration/failover_performance_test.cc index 31c74a036..939162a94 100644 --- a/integration/failover_performance_test.cc +++ b/integration/failover_performance_test.cc @@ -150,7 +150,7 @@ class FailoverPerformanceTest : Aws::Auth::AWSCredentials credentials = Aws::Auth::AWSCredentials(Aws::String(ACCESS_KEY), Aws::String(SECRET_ACCESS_KEY), Aws::String(SESSION_TOKEN)); - Aws::Client::ClientConfiguration client_config; + Aws::RDS::RDSClientConfiguration client_config; Aws::RDS::RDSClient rds_client; SQLHENV env = nullptr; SQLHDBC dbc = nullptr; diff --git a/integration/network_failover_integration_test.cc b/integration/network_failover_integration_test.cc index 2c3630e57..e9a7cde8c 100644 --- a/integration/network_failover_integration_test.cc +++ b/integration/network_failover_integration_test.cc @@ -37,7 +37,7 @@ class NetworkFailoverIntegrationTest : public BaseFailoverIntegrationTest { Aws::Auth::AWSCredentials credentials = Aws::Auth::AWSCredentials(Aws::String(ACCESS_KEY), Aws::String(SECRET_ACCESS_KEY), Aws::String(SESSION_TOKEN)); - Aws::Client::ClientConfiguration client_config; + Aws::RDS::RDSClientConfiguration client_config; Aws::RDS::RDSClient rds_client; SQLHENV env = nullptr; SQLHDBC dbc = nullptr; diff --git a/testframework/src/test/java/host/IntegrationContainerTest.java b/testframework/src/test/java/host/IntegrationContainerTest.java index 5c7090d7b..1f093428c 100644 --- a/testframework/src/test/java/host/IntegrationContainerTest.java +++ b/testframework/src/test/java/host/IntegrationContainerTest.java @@ -133,7 +133,6 @@ public void testRunCommunityTestInContainer() public void testRunFailoverTestInContainer() throws UnsupportedOperationException, IOException, InterruptedException { setupFailoverIntegrationTests(NETWORK); - containerHelper.runExecutable(testContainer, "build/integration/bin", "integration"); } diff --git a/unit_testing/failover_reader_handler_test.cc b/unit_testing/failover_reader_handler_test.cc index 4a8569f7a..5b34e2a40 100644 --- a/unit_testing/failover_reader_handler_test.cc +++ b/unit_testing/failover_reader_handler_test.cc @@ -38,6 +38,7 @@ using ::testing::_; using ::testing::AnyNumber; +using ::testing::AtLeast; using ::testing::Invoke; using ::testing::Mock; using ::testing::Return; @@ -60,12 +61,14 @@ class FailoverReaderHandlerTest : public testing::Test { MOCK_CONNECTION_PROXY* mock_reader_a_proxy; MOCK_CONNECTION_PROXY* mock_reader_b_proxy; MOCK_CONNECTION_PROXY* mock_writer_proxy; + ctpl::thread_pool failover_thread_pool; static std::shared_ptr topology; std::shared_ptr mock_ts; std::shared_ptr mock_connection_handler; - MOCK_FAILOVER_SYNC mock_sync; + std::shared_ptr mock_sync; + static void SetUpTestSuite() { reader_a_host = std::make_shared("reader-a-host" + HOST_SUFFIX, 1234, UP, false); @@ -91,7 +94,6 @@ class FailoverReaderHandlerTest : public testing::Test { void SetUp() override { allocate_odbc_handles(env, dbc, ds); - reader_a_host->set_host_state(UP); reader_b_host->set_host_state(UP); reader_c_host->set_host_state(DOWN); @@ -99,7 +101,8 @@ class FailoverReaderHandlerTest : public testing::Test { mock_ts = std::make_shared(); mock_connection_handler = std::make_shared(); - EXPECT_CALL(mock_sync, is_completed()).WillRepeatedly(Return(false)); + mock_sync = std::make_shared(); + EXPECT_CALL(*mock_sync, is_completed()).WillRepeatedly(Return(false)); } void TearDown() override { @@ -155,7 +158,7 @@ TEST_F(FailoverReaderHandlerTest, GenerateTopology) { } TEST_F(FailoverReaderHandlerTest, BuildHostsList) { - FAILOVER_READER_HANDLER reader_handler(mock_ts, mock_connection_handler, 60000, 30000, false, 0); + FAILOVER_READER_HANDLER reader_handler(mock_ts, mock_connection_handler, failover_thread_pool, 60000, 30000, false, 0); std::shared_ptr topology_info; std::vector> hosts_list; @@ -227,12 +230,12 @@ TEST_F(FailoverReaderHandlerTest, GetConnectionFromHosts_Failure) { EXPECT_CALL(*mock_ts, mark_host_down(reader_c_host)).Times(1); EXPECT_CALL(*mock_ts, mark_host_down(writer_host)).Times(1); - FAILOVER_READER_HANDLER reader_handler(mock_ts, mock_connection_handler, 60000, 30000, false, 0); + FAILOVER_READER_HANDLER reader_handler(mock_ts, mock_connection_handler, failover_thread_pool, 60000, 30000, false, 0); auto hosts_list = reader_handler.build_hosts_list(topology, true); - READER_FAILOVER_RESULT result = reader_handler.get_connection_from_hosts(hosts_list, std::ref(mock_sync)); + auto result = reader_handler.get_connection_from_hosts(hosts_list, mock_sync); - EXPECT_FALSE(result.connected); - EXPECT_THAT(result.new_connection, nullptr); + EXPECT_FALSE(result->connected); + EXPECT_THAT(result->new_connection, nullptr); } // Verify that reader failover handler connects to a reader node that is marked up. @@ -250,13 +253,13 @@ TEST_F(FailoverReaderHandlerTest, GetConnectionFromHosts_Success_Reader) { // Reader C will not be used as it is put at the end. Will only try to connect to A and B EXPECT_CALL(*mock_ts, mark_host_up(reader_a_host)).Times(1); - FAILOVER_READER_HANDLER reader_handler(mock_ts, mock_connection_handler, 60000, 30000, false, 0); + FAILOVER_READER_HANDLER reader_handler(mock_ts, mock_connection_handler, failover_thread_pool, 60000, 30000, false, 0); auto hosts_list = reader_handler.build_hosts_list(topology, true); - READER_FAILOVER_RESULT result = reader_handler.get_connection_from_hosts(hosts_list, std::ref(mock_sync)); + auto result = reader_handler.get_connection_from_hosts(hosts_list, mock_sync); - EXPECT_TRUE(result.connected); - EXPECT_THAT(result.new_connection, mock_reader_a_proxy); - EXPECT_FALSE(result.new_host->is_host_writer()); + EXPECT_TRUE(result->connected); + EXPECT_THAT(result->new_connection, mock_reader_a_proxy); + EXPECT_FALSE(result->new_host->is_host_writer()); // Explicit delete on reader A as it is returned as valid connection/result delete mock_reader_a_proxy; @@ -276,13 +279,13 @@ TEST_F(FailoverReaderHandlerTest, GetConnectionFromHosts_Success_Writer) { EXPECT_CALL(*mock_ts, mark_host_up(writer_host)).Times(1); - FAILOVER_READER_HANDLER reader_handler(mock_ts, mock_connection_handler, 60000, 30000, false, 0); + FAILOVER_READER_HANDLER reader_handler(mock_ts, mock_connection_handler, failover_thread_pool, 60000, 30000, false, 0); auto hosts_list = reader_handler.build_hosts_list(topology, true); - READER_FAILOVER_RESULT result = reader_handler.get_connection_from_hosts(hosts_list, std::ref(mock_sync)); + auto result = reader_handler.get_connection_from_hosts(hosts_list, mock_sync); - EXPECT_TRUE(result.connected); - EXPECT_THAT(result.new_connection, mock_writer_proxy); - EXPECT_TRUE(result.new_host->is_host_writer()); + EXPECT_TRUE(result->connected); + EXPECT_THAT(result->new_connection, mock_writer_proxy); + EXPECT_TRUE(result->new_host->is_host_writer()); // Explicit delete as it is returned as result & is not deconstructed during failover delete mock_writer_proxy; @@ -313,13 +316,13 @@ TEST_F(FailoverReaderHandlerTest, GetConnectionFromHosts_FastestHost) { return mock_reader_b_proxy; })); - FAILOVER_READER_HANDLER reader_handler(mock_ts, mock_connection_handler, 60000, 30000, false, 0); + FAILOVER_READER_HANDLER reader_handler(mock_ts, mock_connection_handler, failover_thread_pool, 60000, 30000, false, 0); auto hosts_list = reader_handler.build_hosts_list(topology, true); - READER_FAILOVER_RESULT result = reader_handler.get_connection_from_hosts(hosts_list, std::ref(mock_sync)); + auto result = reader_handler.get_connection_from_hosts(hosts_list, mock_sync); - EXPECT_TRUE(result.connected); - EXPECT_THAT(result.new_connection, mock_reader_a_proxy); - EXPECT_FALSE(result.new_host->is_host_writer()); + EXPECT_TRUE(result->connected); + EXPECT_THAT(result->new_connection, mock_reader_a_proxy); + EXPECT_FALSE(result->new_host->is_host_writer()); // Explicit delete on reader A as it is returned as a valid result delete mock_reader_a_proxy; @@ -352,12 +355,12 @@ TEST_F(FailoverReaderHandlerTest, GetConnectionFromHosts_Timeout) { EXPECT_CALL(*mock_ts, mark_host_down(_)).Times(AnyNumber()); EXPECT_CALL(*mock_ts, mark_host_down(writer_host)).Times(1); - FAILOVER_READER_HANDLER reader_handler(mock_ts, mock_connection_handler, 60000, 1000, false, 0); + FAILOVER_READER_HANDLER reader_handler(mock_ts, mock_connection_handler, failover_thread_pool, 60000, 1000, false, 0); auto hosts_list = reader_handler.build_hosts_list(topology, true); - READER_FAILOVER_RESULT result = reader_handler.get_connection_from_hosts(hosts_list, std::ref(mock_sync)); + auto result = reader_handler.get_connection_from_hosts(hosts_list, mock_sync); - EXPECT_FALSE(result.connected); - EXPECT_THAT(result.new_connection, nullptr); + EXPECT_FALSE(result->connected); + EXPECT_THAT(result->new_connection, nullptr); } // Verify that reader failover handler fails to connect to any reader node or @@ -367,16 +370,16 @@ TEST_F(FailoverReaderHandlerTest, Failover_Failure) { EXPECT_CALL(*mock_connection_handler, connect(_, nullptr)).WillRepeatedly(Return(nullptr)); - EXPECT_CALL(*mock_ts, mark_host_down(reader_a_host)).Times(1); - EXPECT_CALL(*mock_ts, mark_host_down(reader_b_host)).Times(1); - EXPECT_CALL(*mock_ts, mark_host_down(reader_c_host)).Times(1); - EXPECT_CALL(*mock_ts, mark_host_down(writer_host)).Times(1); + EXPECT_CALL(*mock_ts, mark_host_down(reader_a_host)).Times(AtLeast(1)); + EXPECT_CALL(*mock_ts, mark_host_down(reader_b_host)).Times(AtLeast(1)); + EXPECT_CALL(*mock_ts, mark_host_down(reader_c_host)).Times(AtLeast(1)); + EXPECT_CALL(*mock_ts, mark_host_down(writer_host)).Times(AtLeast(1)); - FAILOVER_READER_HANDLER reader_handler(mock_ts, mock_connection_handler, 3000, 1000, false, 0); - READER_FAILOVER_RESULT result = reader_handler.failover(topology); + FAILOVER_READER_HANDLER reader_handler(mock_ts, mock_connection_handler, failover_thread_pool, 3000, 1000, false, 0); + auto result = reader_handler.failover(topology); - EXPECT_FALSE(result.connected); - EXPECT_THAT(result.new_connection, nullptr); + EXPECT_FALSE(result->connected); + EXPECT_THAT(result->new_connection, nullptr); } // Verify that reader failover handler connects to a faster reader node. @@ -404,18 +407,18 @@ TEST_F(FailoverReaderHandlerTest, Failover_Success_Reader) { EXPECT_CALL(*mock_connection_handler, connect(_, nullptr)).WillRepeatedly(Return(nullptr)); EXPECT_CALL(*mock_connection_handler, connect(reader_a_host, nullptr)).WillRepeatedly( Return(mock_reader_a_proxy)); - EXPECT_CALL(*mock_connection_handler, connect(reader_b_host, nullptr)).WillRepeatedly(Invoke([&]() { + EXPECT_CALL(*mock_connection_handler, connect(reader_b_host, nullptr)).WillRepeatedly(Invoke([=]() { std::this_thread::sleep_for(std::chrono::milliseconds(5000)); return mock_reader_b_proxy; })); - FAILOVER_READER_HANDLER reader_handler(mock_ts, mock_connection_handler, 60000, 30000, false, 0); - READER_FAILOVER_RESULT result = reader_handler.failover(current_topology); + FAILOVER_READER_HANDLER reader_handler(mock_ts, mock_connection_handler, failover_thread_pool, 60000, 30000, false, 0); + auto result = reader_handler.failover(current_topology); - EXPECT_TRUE(result.connected); - EXPECT_THAT(result.new_connection, mock_reader_a_proxy); - EXPECT_FALSE(result.new_host->is_host_writer()); - EXPECT_EQ("reader-a-host", result.new_host->instance_name); + EXPECT_TRUE(result->connected); + EXPECT_THAT(result->new_connection, mock_reader_a_proxy); + EXPECT_FALSE(result->new_host->is_host_writer()); + EXPECT_EQ("reader-a-host", result->new_host->instance_name); // Explicit delete on reader A as it's returned as a valid result delete mock_reader_a_proxy; diff --git a/unit_testing/failover_writer_handler_test.cc b/unit_testing/failover_writer_handler_test.cc index 86b94b7e5..77ecbfb7f 100644 --- a/unit_testing/failover_writer_handler_test.cc +++ b/unit_testing/failover_writer_handler_test.cc @@ -69,6 +69,7 @@ class FailoverWriterHandlerTest : public testing::Test { MOCK_CONNECTION_PROXY* mock_reader_b_proxy; MOCK_CONNECTION_PROXY* mock_writer_proxy; MOCK_CONNECTION_PROXY* mock_new_writer_proxy; + ctpl::thread_pool failover_thread_pool; static void SetUpTestSuite() {} @@ -76,7 +77,6 @@ class FailoverWriterHandlerTest : public testing::Test { void SetUp() override { allocate_odbc_handles(env, dbc, ds); - writer_instance_name = "writer-host"; new_writer_instance_name = "new-writer-host"; @@ -124,7 +124,7 @@ TEST_F(FailoverWriterHandlerTest, ReconnectToWriter_TaskBEmptyReaderResult) { EXPECT_CALL(*mock_ts, mark_host_up(writer_host)).InSequence(s); EXPECT_CALL(*mock_reader_handler, get_reader_connection(_, _)) - .WillRepeatedly(Return(READER_FAILOVER_RESULT(false, nullptr, nullptr))); + .WillRepeatedly(Return(std::make_shared(false, nullptr, nullptr))); EXPECT_CALL(*mock_connection_handler, connect(writer_host, nullptr)) .WillRepeatedly(Return(mock_writer_proxy)); @@ -134,12 +134,12 @@ TEST_F(FailoverWriterHandlerTest, ReconnectToWriter_TaskBEmptyReaderResult) { .WillRepeatedly(Return(nullptr)); FAILOVER_WRITER_HANDLER writer_handler( - mock_ts, mock_reader_handler, mock_connection_handler, 5000, 2000, 2000, 0); + mock_ts, mock_reader_handler, mock_connection_handler, failover_thread_pool, 5000, 2000, 2000, 0); auto result = writer_handler.failover(current_topology); - EXPECT_TRUE(result.connected); - EXPECT_FALSE(result.is_new_host); - EXPECT_THAT(result.new_connection, mock_writer_proxy); + EXPECT_TRUE(result->connected); + EXPECT_FALSE(result->is_new_host); + EXPECT_THAT(result->new_connection, mock_writer_proxy); // Explicit delete on writer connection as it's returned as a valid result delete mock_writer_proxy; @@ -186,21 +186,21 @@ TEST_F(FailoverWriterHandlerTest, ReconnectToWriter_SlowReaderA) { EXPECT_CALL(*mock_reader_handler, get_reader_connection(_, _)) .WillRepeatedly(DoAll( - Invoke([](Unused, FAILOVER_SYNC& f_sync) { - for (int i = 0; i <= 50 && !f_sync.is_completed(); i++) { + Invoke([](Unused, std::shared_ptr f_sync) { + for (int i = 0; i <= 50 && !f_sync->is_completed(); i++) { std::this_thread::sleep_for(std::chrono::milliseconds(100)); } }), - Return(READER_FAILOVER_RESULT(true, reader_a_host, + Return(std::make_shared(true, reader_a_host, mock_reader_a_proxy)))); FAILOVER_WRITER_HANDLER writer_handler( - mock_ts, mock_reader_handler, mock_connection_handler, 60000, 5000, 5000, 0); + mock_ts, mock_reader_handler, mock_connection_handler, failover_thread_pool, 60000, 5000, 5000, 0); const auto result = writer_handler.failover(current_topology); - EXPECT_TRUE(result.connected); - EXPECT_FALSE(result.is_new_host); - EXPECT_THAT(result.new_connection, mock_writer_proxy); + EXPECT_TRUE(result->connected); + EXPECT_FALSE(result->is_new_host); + EXPECT_THAT(result->new_connection, mock_writer_proxy); // Explicit delete on writer connection as it's returned as a valid result delete mock_writer_proxy; @@ -236,16 +236,16 @@ TEST_F(FailoverWriterHandlerTest, ReconnectToWriter_TaskBDefers) { EXPECT_CALL(*mock_ts, mark_host_up(writer_host)).InSequence(s); EXPECT_CALL(*mock_reader_handler, get_reader_connection(_, _)) - .WillRepeatedly(Return(READER_FAILOVER_RESULT(true, reader_a_host, + .WillRepeatedly(Return(std::make_shared(true, reader_a_host, mock_reader_a_proxy))); FAILOVER_WRITER_HANDLER writer_handler( - mock_ts, mock_reader_handler, mock_connection_handler, 60000, 2000, 2000, 0); + mock_ts, mock_reader_handler, mock_connection_handler, failover_thread_pool, 60000, 2000, 2000, 0); auto result = writer_handler.failover(current_topology); - EXPECT_TRUE(result.connected); - EXPECT_FALSE(result.is_new_host); - EXPECT_THAT(result.new_connection, mock_writer_proxy); + EXPECT_TRUE(result->connected); + EXPECT_FALSE(result->is_new_host); + EXPECT_THAT(result->new_connection, mock_writer_proxy); // Explicit delete on writer connection as it's returned as a valid result delete mock_writer_proxy; @@ -299,19 +299,19 @@ TEST_F(FailoverWriterHandlerTest, ConnectToReaderA_SlowWriter) { EXPECT_CALL(*mock_ts, mark_host_up(new_writer_host)).Times(1); EXPECT_CALL(*mock_reader_handler, get_reader_connection(_, _)) - .WillRepeatedly(Return(READER_FAILOVER_RESULT(true, reader_a_host, + .WillRepeatedly(Return(std::make_shared(true, reader_a_host, mock_reader_a_proxy))); FAILOVER_WRITER_HANDLER writer_handler( - mock_ts, mock_reader_handler, mock_connection_handler, 60000, 5000, 5000, 0); + mock_ts, mock_reader_handler, mock_connection_handler, failover_thread_pool, 60000, 5000, 5000, 0); auto result = writer_handler.failover(current_topology); - EXPECT_TRUE(result.connected); - EXPECT_TRUE(result.is_new_host); - EXPECT_THAT(result.new_connection, mock_new_writer_proxy); - EXPECT_EQ(3, result.new_topology->total_hosts()); + EXPECT_TRUE(result->connected); + EXPECT_TRUE(result->is_new_host); + EXPECT_THAT(result->new_connection, mock_new_writer_proxy); + EXPECT_EQ(3, result->new_topology->total_hosts()); EXPECT_EQ(new_writer_instance_name, - result.new_topology->get_writer()->instance_name); + result->new_topology->get_writer()->instance_name); // Explicit delete on new writer connection as it's returned as a valid result delete mock_new_writer_proxy; @@ -358,19 +358,19 @@ TEST_F(FailoverWriterHandlerTest, ConnectToReaderA_TaskADefers) { EXPECT_CALL(*mock_ts, mark_host_up(new_writer_host)).Times(1); EXPECT_CALL(*mock_reader_handler, get_reader_connection(_, _)) - .WillRepeatedly(Return(READER_FAILOVER_RESULT(true, reader_a_host, + .WillRepeatedly(Return(std::make_shared(true, reader_a_host, mock_reader_a_proxy))); FAILOVER_WRITER_HANDLER writer_handler( - mock_ts, mock_reader_handler, mock_connection_handler, 60000, 5000, 5000, 0); + mock_ts, mock_reader_handler, mock_connection_handler, failover_thread_pool, 60000, 5000, 5000, 0); auto result = writer_handler.failover(current_topology); - EXPECT_TRUE(result.connected); - EXPECT_TRUE(result.is_new_host); - EXPECT_THAT(result.new_connection, mock_new_writer_proxy); - EXPECT_EQ(4, result.new_topology->total_hosts()); + EXPECT_TRUE(result->connected); + EXPECT_TRUE(result->is_new_host); + EXPECT_THAT(result->new_connection, mock_new_writer_proxy); + EXPECT_EQ(4, result->new_topology->total_hosts()); EXPECT_EQ(new_writer_instance_name, - result.new_topology->get_writer()->instance_name); + result->new_topology->get_writer()->instance_name); // Explicit delete on new writer connection as it's returned as a valid result delete mock_new_writer_proxy; @@ -427,16 +427,16 @@ TEST_F(FailoverWriterHandlerTest, FailedToConnect_FailoverTimeout) { EXPECT_CALL(*mock_ts, mark_host_up(new_writer_host)).Times(1); EXPECT_CALL(*mock_reader_handler, get_reader_connection(_, _)) - .WillRepeatedly(Return(READER_FAILOVER_RESULT(true, reader_a_host, + .WillRepeatedly(Return(std::make_shared(true, reader_a_host, mock_reader_a_proxy))); FAILOVER_WRITER_HANDLER writer_handler( - mock_ts, mock_reader_handler, mock_connection_handler, 1000, 2000, 2000, 0); + mock_ts, mock_reader_handler, mock_connection_handler, failover_thread_pool, 1000, 2000, 2000, 0); auto result = writer_handler.failover(current_topology); - EXPECT_FALSE(result.connected); - EXPECT_FALSE(result.is_new_host); - EXPECT_THAT(result.new_connection, nullptr); + EXPECT_FALSE(result->connected); + EXPECT_FALSE(result->is_new_host); + EXPECT_THAT(result->new_connection, nullptr); // delete reader b explicitly, since get_reader_connection() is mocked delete mock_reader_b_proxy; @@ -476,16 +476,16 @@ TEST_F(FailoverWriterHandlerTest, FailedToConnect_TaskAFailed_TaskBWriterFailed) EXPECT_CALL(*mock_ts, mark_host_down(new_writer_host)).Times(AtLeast(1)); EXPECT_CALL(*mock_reader_handler, get_reader_connection(_, _)) - .WillRepeatedly(Return(READER_FAILOVER_RESULT(true, reader_a_host, + .WillRepeatedly(Return(std::make_shared(true, reader_a_host, mock_reader_a_proxy))); FAILOVER_WRITER_HANDLER writer_handler( - mock_ts, mock_reader_handler, mock_connection_handler, 5000, 2000, 2000, 0); + mock_ts, mock_reader_handler, mock_connection_handler, failover_thread_pool, 5000, 2000, 2000, 0); auto result = writer_handler.failover(current_topology); - EXPECT_FALSE(result.connected); - EXPECT_FALSE(result.is_new_host); - EXPECT_THAT(result.new_connection, nullptr); + EXPECT_FALSE(result->connected); + EXPECT_FALSE(result->is_new_host); + EXPECT_THAT(result->new_connection, nullptr); // delete reader b explicitly, since get_reader_connection() is mocked delete mock_reader_b_proxy; diff --git a/unit_testing/mock_objects.h b/unit_testing/mock_objects.h index e234b633d..156b18e15 100644 --- a/unit_testing/mock_objects.h +++ b/unit_testing/mock_objects.h @@ -108,15 +108,17 @@ class MOCK_TOPOLOGY_SERVICE : public TOPOLOGY_SERVICE { class MOCK_READER_HANDLER : public FAILOVER_READER_HANDLER { public: - MOCK_READER_HANDLER() : FAILOVER_READER_HANDLER(nullptr, nullptr, 0, 0, false, 0) {} - MOCK_METHOD(READER_FAILOVER_RESULT, get_reader_connection, - (std::shared_ptr, FAILOVER_SYNC&)); + MOCK_READER_HANDLER() : FAILOVER_READER_HANDLER(nullptr, nullptr, thread_pool, 0, 0, false, 0) {} + MOCK_METHOD(std::shared_ptr, get_reader_connection, + (std::shared_ptr, std::shared_ptr)); + + ctpl::thread_pool thread_pool; }; class MOCK_CONNECTION_HANDLER : public CONNECTION_HANDLER { public: MOCK_CONNECTION_HANDLER() : CONNECTION_HANDLER(nullptr) {} - MOCK_METHOD(CONNECTION_PROXY*, connect, (const std::shared_ptr&, DataSource*)); + MOCK_METHOD(CONNECTION_PROXY*, connect, (std::shared_ptr, DataSource*)); MOCK_METHOD(SQLRETURN, do_connect, (DBC*, DataSource*, bool)); };