From e947b9c29d50320acdeffdd8a72e6db4227a7d56 Mon Sep 17 00:00:00 2001 From: Matt Witherspoon <32485495+spoonincode@users.noreply.github.com> Date: Tue, 7 Jul 2020 18:14:22 -0400 Subject: [PATCH 1/9] retrying_amqp_connection --- libraries/CMakeLists.txt | 1 + .../retrying_amqp_connection/CMakeLists.txt | 3 + .../retrying_amqp_connection.hpp | 268 ++++++++++++++++++ .../retrying_amqp_connection.cpp | 17 ++ 4 files changed, 289 insertions(+) create mode 100644 libraries/retrying_amqp_connection/CMakeLists.txt create mode 100644 libraries/retrying_amqp_connection/include/eosio/retrying_amqp_connection/retrying_amqp_connection.hpp create mode 100644 libraries/retrying_amqp_connection/retrying_amqp_connection.cpp diff --git a/libraries/CMakeLists.txt b/libraries/CMakeLists.txt index f31498ab0c2..e712a788110 100644 --- a/libraries/CMakeLists.txt +++ b/libraries/CMakeLists.txt @@ -22,6 +22,7 @@ add_subdirectory( abieos ) add_subdirectory( rocksdb EXCLUDE_FROM_ALL ) add_subdirectory( chain_kv ) add_subdirectory( se-helpers ) +add_subdirectory( retrying_amqp_connection ) add_subdirectory( reliable_amqp_publisher ) set(USE_EXISTING_SOFTFLOAT ON CACHE BOOL "use pre-exisiting softfloat lib") diff --git a/libraries/retrying_amqp_connection/CMakeLists.txt b/libraries/retrying_amqp_connection/CMakeLists.txt new file mode 100644 index 00000000000..2c547ee6a79 --- /dev/null +++ b/libraries/retrying_amqp_connection/CMakeLists.txt @@ -0,0 +1,3 @@ +add_library(retrying_amqp_connection retrying_amqp_connection.cpp) +target_include_directories(retrying_amqp_connection PUBLIC include) +target_link_libraries(retrying_amqp_connection fc amqpcpp) diff --git a/libraries/retrying_amqp_connection/include/eosio/retrying_amqp_connection/retrying_amqp_connection.hpp b/libraries/retrying_amqp_connection/include/eosio/retrying_amqp_connection/retrying_amqp_connection.hpp new file mode 100644 index 00000000000..e13ec62c27e --- /dev/null +++ b/libraries/retrying_amqp_connection/include/eosio/retrying_amqp_connection/retrying_amqp_connection.hpp @@ -0,0 +1,268 @@ +#pragma once + +#include + +#include + +#include +#include + +/* + * A retrying_amqp_connection manages a connection to an AMQP server that will retry the connection + * on failure. Most users should consider single_channel_retrying_amqp_connection instead, which additionally + * manages a single channel. + */ + +namespace eosio { +template +struct retrying_amqp_connection : public AMQP::ConnectionHandler { + using connection_ready_callback_t = std::function; + using connection_failed_callback_t = std::function; + + /// \param executor executor to use for all asio operations; use a strand when appropriate as one is not used internally + /// \param address AMQP address to connect to + /// \param ready a callback when the AMQP connection has been established + /// \param failed a callback when the AMQP connection has failed after being established; should no longer use the AMQP::Connection* after this callback + /// \param logger logger to send logging to + retrying_amqp_connection(Executor& executor, const AMQP::Address& address, connection_ready_callback_t ready, + connection_failed_callback_t failed, fc::logger logger = fc::logger::get()) : + _executor(executor), _resolver(executor), _sock(executor), _timer(executor), _address(address), + _ready_callback(ready), _failed_callback(failed), _logger(logger) { + + FC_ASSERT(!_address.secure(), "Only amqp:// URIs are supported for AMQP addresses (${a})", ("a", _address)); + start_connection(); + } + + const AMQP::Address& address() const { + return _address; + } + +protected: + void onReady(AMQP::Connection* connection) override { + fc_ilog(_logger, "AMQP connection to ${s} is fully operational", ("s", _address)); + + _ready_callback(connection); + _indicated_ready = true; + } + + void onData(AMQP::Connection* connection, const char* data, size_t size) override { + if(!_sock.is_open()) + return; + _state->outgoing_queue.emplace_back(data, data+size); + send_some(); + } + + void onError(AMQP::Connection* connection, const char* message) override { + fc_elog(_logger, "AMQP connection to ${s} suffered an error; will retry shortly: ${m}", ("s", _address)("m", message)); + schedule_retry(); + } + + void onClosed(AMQP::Connection *connection) override { + fc_wlog(_logger, "AMQP connection to ${s} closed AMQP connection", ("s", _address)); + schedule_retry(); + } + +private: + void start_connection() { + _resolver.async_resolve(_address.hostname(), std::to_string(_address.port()), boost::asio::bind_executor(_executor, [this](const auto ec, const auto endpoints) { + if(ec) { + if(ec != boost::asio::error::operation_aborted) { + fc_wlog(_logger, "Failed resolving AMQP server ${s}; will retry shortly: ${m}", ("s", _address)("m", ec.message())); + schedule_retry(); + } + return; + } + //AMQP::Connection's dtor will attempt to send a last gasp message. Resetting state here is a little easier to prove + // as being safe as it requires pumping the event loop once vs placing the state reset directly in schedule_retry() + _state.emplace(); + boost::asio::async_connect(_sock, endpoints, boost::asio::bind_executor(_executor, [this](const auto ec, const auto endpoint) { + if(ec) { + if(ec != boost::asio::error::operation_aborted) { + fc_wlog(_logger, "Failed connecting AMQP server ${s}; will retry shortly: ${m}", ("s", _address)("m", ec.message())); + schedule_retry(); + } + return; + } + fc_ilog(_logger, "TCP connection to AMQP server at ${s} is up", ("s", _address)); + receive_some(); + _state->amqp_connection.emplace(this, _address.login(), _address.vhost()); + })); + })); + } + + void schedule_retry() { + if(_indicated_ready) + _failed_callback(); + _indicated_ready = false; + _sock.close(); + _resolver.cancel(); + + boost::system::error_code ec; + _timer.expires_from_now(std::chrono::seconds(1), ec); + if(ec) + return; + _timer.async_wait(boost::asio::bind_executor(_executor, [this](const auto ec) { + if(ec) + return; + start_connection(); + })); + } + + void send_some() { + if(_state->send_outstanding || _state->outgoing_queue.empty()) + return; + _state->send_outstanding = true; + boost::asio::async_write(_sock, boost::asio::buffer(_state->outgoing_queue.front()), boost::asio::bind_executor(_executor, [this](const auto& ec, size_t wrote) { + if(ec) { + if(ec != boost::asio::error::operation_aborted) { + fc_wlog(_logger, "Failed writing to AMQP server ${s}; connection will retry shortly: ${m}", ("s", _address)("m", ec.message())); + schedule_retry(); + } + return; + } + _state->outgoing_queue.pop_front(); + _state->send_outstanding = false; + send_some(); + })); + } + + void receive_some() { + _sock.async_read_some(boost::asio::buffer(_read_buff), boost::asio::bind_executor(_executor, [this](const auto& ec, size_t sz) { + if(ec) { + if(ec != boost::asio::error::operation_aborted) { + fc_wlog(_logger, "Failed reading from AMQP server ${s}; connection will retry shortly: ${m}", ("s", _address)("m", ec.message())); + schedule_retry(); + } + return; + } + _state->read_queue.insert(_state->read_queue.end(), _read_buff, _read_buff + sz); + auto used = _state->amqp_connection->parse(_state->read_queue.data(), _state->read_queue.size()); + _state->read_queue.erase(_state->read_queue.begin(), _state->read_queue.begin()+used); + + //parse() could have resulted in an error on a channel. In an earlier implementation users could call a function + // which may have caused a socket connection to .close() due to that. This check below may no longer be needed since + // retrying_amqp_connection no longer exposes a way to "bump" a connection + if(_sock.is_open()) + receive_some(); + })); + } + + char _read_buff[64*1024]; + + Executor& _executor; + + boost::asio::ip::tcp::resolver _resolver; + boost::asio::ip::tcp::socket _sock; + boost::asio::steady_timer _timer; + + AMQP::Address _address; + + connection_ready_callback_t _ready_callback; + connection_failed_callback_t _failed_callback; + bool _indicated_ready = false; + + fc::logger _logger; + + struct state { + state() {} + + std::deque> outgoing_queue; + bool send_outstanding = false; + + std::vector read_queue; + + std::optional amqp_connection; + }; + std::optional _state; + //be aware that AMQP::Connection's dtor will attempt to send a last gasp message on dtor. This means _state's + // destruction will cause onData() to be called when _state's amqp_connection dtor is fired. So be mindful of member + // dtor ordering here as _state & _sock will be accessed during dtor +}; + +template +struct single_channel_retrying_amqp_connection { + using channel_ready_callback_t = std::function; + using failed_callback_t = std::function; + + /// \param executor executor to use for all asio operations; use a strand when appropriate as one is not used internally + /// \param address AMQP address to connect to + /// \param ready a callback when the AMQP channel has been established + /// \param failed a callback when the AMQP channel has failed after being established; should no longer use the AMQP::Channel* within or after this callback + /// \param logger logger to send logging to + single_channel_retrying_amqp_connection(Executor& executor, const AMQP::Address& address, channel_ready_callback_t ready, + failed_callback_t failed, fc::logger logger = fc::logger::get()) : + _executor(executor), + _connection(executor, address, [this](AMQP::Connection* c){conn_ready(c);},[this](){conn_failed();}, logger), + _timer(executor), _channel_ready(ready), _failed(failed) + {} + + const AMQP::Address& address() const { + return _connection.address(); + } + +private: + void conn_ready(AMQP::Connection* c) { + _amqp_connection = c; + bring_up_channel(); + } + + void start_retry() { + boost::system::error_code ec; + _timer.expires_from_now(std::chrono::seconds(1), ec); + if(ec) + return; + _timer.async_wait(boost::asio::bind_executor(_executor, [this](const auto ec) { + if(ec) + return; + bring_up_channel(); + })); + } + + void set_channel_on_error() { + _amqp_channel->onError([this](const char* e) { + wlog("AMQP channel failure on AMQP connection ${c}; retrying : ${m}", ("c", _connection.address())("m", e)); + _failed(); + start_retry(); + }); + } + + void bring_up_channel() { + try { + _amqp_channel.emplace(_amqp_connection); + } + catch(...) { + wlog("AMQP channel could not start for AMQP connection ${c}; retrying", ("c", _connection.address())); + start_retry(); + } + set_channel_on_error(); + _amqp_channel->onReady([this]() { + _channel_ready(&*_amqp_channel); + //in case someone tried to set their own onError()... + set_channel_on_error(); + }); + } + + void conn_failed() { + _amqp_connection = nullptr; + _amqp_channel.reset(); + boost::system::error_code ec; + _timer.cancel(ec); + _failed(); + } + + Executor& _executor; + + retrying_amqp_connection _connection; + boost::asio::steady_timer _timer; + std::optional _amqp_channel; + AMQP::Connection* _amqp_connection = nullptr; + + channel_ready_callback_t _channel_ready; + failed_callback_t _failed; +}; + +} + +namespace fc { +void to_variant(const AMQP::Address& a, fc::variant& v); +} diff --git a/libraries/retrying_amqp_connection/retrying_amqp_connection.cpp b/libraries/retrying_amqp_connection/retrying_amqp_connection.cpp new file mode 100644 index 00000000000..d7cc1905503 --- /dev/null +++ b/libraries/retrying_amqp_connection/retrying_amqp_connection.cpp @@ -0,0 +1,17 @@ +#include +#include + +namespace fc { +void to_variant(const AMQP::Address& a, fc::variant& v) { + std::string str(a.secure() ? "amqps://" : "amqp://"); + str.append(a.login().user()).append(":********").append("@"); + str.append(a.hostname().empty() ? "localhost" : a.hostname()); + if(a.port() != 5672) + str.append(":").append(std::to_string(a.port())); + str.append("/"); + if (a.vhost() != "/") + str.append(a.vhost()); + + v = str; +} +} \ No newline at end of file From 50ab2c920a679fd099c5fac067c390ead9ac47cb Mon Sep 17 00:00:00 2001 From: Matt Witherspoon <32485495+spoonincode@users.noreply.github.com> Date: Wed, 15 Jul 2020 16:01:24 -0400 Subject: [PATCH 2/9] tweak retry_connection() ordering based on recent tests --- .../retrying_amqp_connection/retrying_amqp_connection.hpp | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/libraries/retrying_amqp_connection/include/eosio/retrying_amqp_connection/retrying_amqp_connection.hpp b/libraries/retrying_amqp_connection/include/eosio/retrying_amqp_connection/retrying_amqp_connection.hpp index e13ec62c27e..756eaca8054 100644 --- a/libraries/retrying_amqp_connection/include/eosio/retrying_amqp_connection/retrying_amqp_connection.hpp +++ b/libraries/retrying_amqp_connection/include/eosio/retrying_amqp_connection/retrying_amqp_connection.hpp @@ -91,11 +91,14 @@ struct retrying_amqp_connection : public AMQP::ConnectionHandler { } void schedule_retry() { + _sock.close(); + _resolver.cancel(); + + //calling the failure callback will likely cause downstream users to take action such as closing an AMQP::Channel which + // will attempt to send data. Ensure that _sock is closed before then so onData() will drop those attempts if(_indicated_ready) _failed_callback(); _indicated_ready = false; - _sock.close(); - _resolver.cancel(); boost::system::error_code ec; _timer.expires_from_now(std::chrono::seconds(1), ec); From 1c401e1f47836cfa6683e4eb85c6ab6b091e023c Mon Sep 17 00:00:00 2001 From: Matt Witherspoon <32485495+spoonincode@users.noreply.github.com> Date: Wed, 15 Jul 2020 17:35:30 -0400 Subject: [PATCH 3/9] support for fc::fwd with 5 ctor args --- libraries/fc/include/fc/fwd.hpp | 1 + libraries/fc/include/fc/fwd_impl.hpp | 7 ++++++- 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/libraries/fc/include/fc/fwd.hpp b/libraries/fc/include/fc/fwd.hpp index ccf0828e58b..1a5a01cf050 100644 --- a/libraries/fc/include/fc/fwd.hpp +++ b/libraries/fc/include/fc/fwd.hpp @@ -13,6 +13,7 @@ class fwd { template fwd( U&& u ); template fwd( U&& u, V&& v ); template fwd( U&& u, V&& v, X&&, Y&& ); + template fwd( U&& u, V&& v, X&&, Y&&, Z&& ); fwd(); fwd( const fwd& f ); diff --git a/libraries/fc/include/fc/fwd_impl.hpp b/libraries/fc/include/fc/fwd_impl.hpp index 639085abe81..db1432621f0 100644 --- a/libraries/fc/include/fc/fwd_impl.hpp +++ b/libraries/fc/include/fc/fwd_impl.hpp @@ -76,7 +76,12 @@ namespace fc { check_size(); new (this) T( fc::forward(u), fc::forward(v), fc::forward(x), fc::forward(y) ); } - + template + template + fwd::fwd( U&& u, V&& v, X&& x, Y&& y, Z&& z ) { + check_size(); + new (this) T( fc::forward(u), fc::forward(v), fc::forward(x), fc::forward(y), fc::forward(z) ); + } template fwd::fwd() { From 65407a5da93be11789b1d3ac7e119b13c6878710 Mon Sep 17 00:00:00 2001 From: Matt Witherspoon <32485495+spoonincode@users.noreply.github.com> Date: Wed, 15 Jul 2020 17:36:12 -0400 Subject: [PATCH 4/9] use a strand for all operations & just support passing an io_context for now. Move most the impl to a .cpp since it's not templated any longer --- .../retrying_amqp_connection.hpp | 242 ++--------------- .../retrying_amqp_connection.cpp | 254 ++++++++++++++++++ 2 files changed, 273 insertions(+), 223 deletions(-) diff --git a/libraries/retrying_amqp_connection/include/eosio/retrying_amqp_connection/retrying_amqp_connection.hpp b/libraries/retrying_amqp_connection/include/eosio/retrying_amqp_connection/retrying_amqp_connection.hpp index 756eaca8054..05f73f554aa 100644 --- a/libraries/retrying_amqp_connection/include/eosio/retrying_amqp_connection/retrying_amqp_connection.hpp +++ b/libraries/retrying_amqp_connection/include/eosio/retrying_amqp_connection/retrying_amqp_connection.hpp @@ -14,254 +14,50 @@ */ namespace eosio { -template -struct retrying_amqp_connection : public AMQP::ConnectionHandler { +struct retrying_amqp_connection { using connection_ready_callback_t = std::function; using connection_failed_callback_t = std::function; - /// \param executor executor to use for all asio operations; use a strand when appropriate as one is not used internally + /// \param io_context a strand is created on this io_context for all asio operatoins /// \param address AMQP address to connect to /// \param ready a callback when the AMQP connection has been established /// \param failed a callback when the AMQP connection has failed after being established; should no longer use the AMQP::Connection* after this callback /// \param logger logger to send logging to - retrying_amqp_connection(Executor& executor, const AMQP::Address& address, connection_ready_callback_t ready, - connection_failed_callback_t failed, fc::logger logger = fc::logger::get()) : - _executor(executor), _resolver(executor), _sock(executor), _timer(executor), _address(address), - _ready_callback(ready), _failed_callback(failed), _logger(logger) { + retrying_amqp_connection(boost::asio::io_context& io_context, const AMQP::Address& address, connection_ready_callback_t ready, + connection_failed_callback_t failed, fc::logger logger = fc::logger::get()); - FC_ASSERT(!_address.secure(), "Only amqp:// URIs are supported for AMQP addresses (${a})", ("a", _address)); - start_connection(); - } + const AMQP::Address& address() const; - const AMQP::Address& address() const { - return _address; - } + boost::asio::io_context::strand& strand(); -protected: - void onReady(AMQP::Connection* connection) override { - fc_ilog(_logger, "AMQP connection to ${s} is fully operational", ("s", _address)); - - _ready_callback(connection); - _indicated_ready = true; - } - - void onData(AMQP::Connection* connection, const char* data, size_t size) override { - if(!_sock.is_open()) - return; - _state->outgoing_queue.emplace_back(data, data+size); - send_some(); - } - - void onError(AMQP::Connection* connection, const char* message) override { - fc_elog(_logger, "AMQP connection to ${s} suffered an error; will retry shortly: ${m}", ("s", _address)("m", message)); - schedule_retry(); - } - - void onClosed(AMQP::Connection *connection) override { - fc_wlog(_logger, "AMQP connection to ${s} closed AMQP connection", ("s", _address)); - schedule_retry(); - } + ~retrying_amqp_connection(); private: - void start_connection() { - _resolver.async_resolve(_address.hostname(), std::to_string(_address.port()), boost::asio::bind_executor(_executor, [this](const auto ec, const auto endpoints) { - if(ec) { - if(ec != boost::asio::error::operation_aborted) { - fc_wlog(_logger, "Failed resolving AMQP server ${s}; will retry shortly: ${m}", ("s", _address)("m", ec.message())); - schedule_retry(); - } - return; - } - //AMQP::Connection's dtor will attempt to send a last gasp message. Resetting state here is a little easier to prove - // as being safe as it requires pumping the event loop once vs placing the state reset directly in schedule_retry() - _state.emplace(); - boost::asio::async_connect(_sock, endpoints, boost::asio::bind_executor(_executor, [this](const auto ec, const auto endpoint) { - if(ec) { - if(ec != boost::asio::error::operation_aborted) { - fc_wlog(_logger, "Failed connecting AMQP server ${s}; will retry shortly: ${m}", ("s", _address)("m", ec.message())); - schedule_retry(); - } - return; - } - fc_ilog(_logger, "TCP connection to AMQP server at ${s} is up", ("s", _address)); - receive_some(); - _state->amqp_connection.emplace(this, _address.login(), _address.vhost()); - })); - })); - } - - void schedule_retry() { - _sock.close(); - _resolver.cancel(); - - //calling the failure callback will likely cause downstream users to take action such as closing an AMQP::Channel which - // will attempt to send data. Ensure that _sock is closed before then so onData() will drop those attempts - if(_indicated_ready) - _failed_callback(); - _indicated_ready = false; - - boost::system::error_code ec; - _timer.expires_from_now(std::chrono::seconds(1), ec); - if(ec) - return; - _timer.async_wait(boost::asio::bind_executor(_executor, [this](const auto ec) { - if(ec) - return; - start_connection(); - })); - } - - void send_some() { - if(_state->send_outstanding || _state->outgoing_queue.empty()) - return; - _state->send_outstanding = true; - boost::asio::async_write(_sock, boost::asio::buffer(_state->outgoing_queue.front()), boost::asio::bind_executor(_executor, [this](const auto& ec, size_t wrote) { - if(ec) { - if(ec != boost::asio::error::operation_aborted) { - fc_wlog(_logger, "Failed writing to AMQP server ${s}; connection will retry shortly: ${m}", ("s", _address)("m", ec.message())); - schedule_retry(); - } - return; - } - _state->outgoing_queue.pop_front(); - _state->send_outstanding = false; - send_some(); - })); - } - - void receive_some() { - _sock.async_read_some(boost::asio::buffer(_read_buff), boost::asio::bind_executor(_executor, [this](const auto& ec, size_t sz) { - if(ec) { - if(ec != boost::asio::error::operation_aborted) { - fc_wlog(_logger, "Failed reading from AMQP server ${s}; connection will retry shortly: ${m}", ("s", _address)("m", ec.message())); - schedule_retry(); - } - return; - } - _state->read_queue.insert(_state->read_queue.end(), _read_buff, _read_buff + sz); - auto used = _state->amqp_connection->parse(_state->read_queue.data(), _state->read_queue.size()); - _state->read_queue.erase(_state->read_queue.begin(), _state->read_queue.begin()+used); - - //parse() could have resulted in an error on a channel. In an earlier implementation users could call a function - // which may have caused a socket connection to .close() due to that. This check below may no longer be needed since - // retrying_amqp_connection no longer exposes a way to "bump" a connection - if(_sock.is_open()) - receive_some(); - })); - } - - char _read_buff[64*1024]; - - Executor& _executor; - - boost::asio::ip::tcp::resolver _resolver; - boost::asio::ip::tcp::socket _sock; - boost::asio::steady_timer _timer; - - AMQP::Address _address; - - connection_ready_callback_t _ready_callback; - connection_failed_callback_t _failed_callback; - bool _indicated_ready = false; - - fc::logger _logger; - - struct state { - state() {} - - std::deque> outgoing_queue; - bool send_outstanding = false; - - std::vector read_queue; - - std::optional amqp_connection; - }; - std::optional _state; - //be aware that AMQP::Connection's dtor will attempt to send a last gasp message on dtor. This means _state's - // destruction will cause onData() to be called when _state's amqp_connection dtor is fired. So be mindful of member - // dtor ordering here as _state & _sock will be accessed during dtor + struct impl; + constexpr static size_t fwd_size = 66424; + fc::fwd my; }; -template struct single_channel_retrying_amqp_connection { using channel_ready_callback_t = std::function; using failed_callback_t = std::function; - /// \param executor executor to use for all asio operations; use a strand when appropriate as one is not used internally + /// \param io_context a strand is created on this io_context for all asio operatoins /// \param address AMQP address to connect to /// \param ready a callback when the AMQP channel has been established /// \param failed a callback when the AMQP channel has failed after being established; should no longer use the AMQP::Channel* within or after this callback /// \param logger logger to send logging to - single_channel_retrying_amqp_connection(Executor& executor, const AMQP::Address& address, channel_ready_callback_t ready, - failed_callback_t failed, fc::logger logger = fc::logger::get()) : - _executor(executor), - _connection(executor, address, [this](AMQP::Connection* c){conn_ready(c);},[this](){conn_failed();}, logger), - _timer(executor), _channel_ready(ready), _failed(failed) - {} - - const AMQP::Address& address() const { - return _connection.address(); - } + single_channel_retrying_amqp_connection(boost::asio::io_context& io_context, const AMQP::Address& address, channel_ready_callback_t ready, + failed_callback_t failed, fc::logger logger = fc::logger::get()); -private: - void conn_ready(AMQP::Connection* c) { - _amqp_connection = c; - bring_up_channel(); - } - - void start_retry() { - boost::system::error_code ec; - _timer.expires_from_now(std::chrono::seconds(1), ec); - if(ec) - return; - _timer.async_wait(boost::asio::bind_executor(_executor, [this](const auto ec) { - if(ec) - return; - bring_up_channel(); - })); - } - - void set_channel_on_error() { - _amqp_channel->onError([this](const char* e) { - wlog("AMQP channel failure on AMQP connection ${c}; retrying : ${m}", ("c", _connection.address())("m", e)); - _failed(); - start_retry(); - }); - } - - void bring_up_channel() { - try { - _amqp_channel.emplace(_amqp_connection); - } - catch(...) { - wlog("AMQP channel could not start for AMQP connection ${c}; retrying", ("c", _connection.address())); - start_retry(); - } - set_channel_on_error(); - _amqp_channel->onReady([this]() { - _channel_ready(&*_amqp_channel); - //in case someone tried to set their own onError()... - set_channel_on_error(); - }); - } + const AMQP::Address& address() const; - void conn_failed() { - _amqp_connection = nullptr; - _amqp_channel.reset(); - boost::system::error_code ec; - _timer.cancel(ec); - _failed(); - } + ~single_channel_retrying_amqp_connection(); - Executor& _executor; - - retrying_amqp_connection _connection; - boost::asio::steady_timer _timer; - std::optional _amqp_channel; - AMQP::Connection* _amqp_connection = nullptr; - - channel_ready_callback_t _channel_ready; - failed_callback_t _failed; +private: + struct impl; + constexpr static size_t fwd_size = 66608; + fc::fwd my; }; } diff --git a/libraries/retrying_amqp_connection/retrying_amqp_connection.cpp b/libraries/retrying_amqp_connection/retrying_amqp_connection.cpp index d7cc1905503..fe2f370b599 100644 --- a/libraries/retrying_amqp_connection/retrying_amqp_connection.cpp +++ b/libraries/retrying_amqp_connection/retrying_amqp_connection.cpp @@ -1,5 +1,259 @@ #include #include +#include + +#include + +namespace eosio { + +struct retrying_amqp_connection::impl : public AMQP::ConnectionHandler { + impl(boost::asio::io_context& io_context, const AMQP::Address& address, connection_ready_callback_t ready, + connection_failed_callback_t failed, fc::logger logger = fc::logger::get()) : + _strand(io_context), _resolver(_strand), _sock(_strand), _timer(_strand), _address(address), + _ready_callback(ready), _failed_callback(failed), _logger(logger) { + + FC_ASSERT(!_address.secure(), "Only amqp:// URIs are supported for AMQP addresses (${a})", ("a", _address)); + start_connection(); + } + + void onReady(AMQP::Connection* connection) override { + fc_ilog(_logger, "AMQP connection to ${s} is fully operational", ("s", _address)); + + _ready_callback(connection); + _indicated_ready = true; + } + + void onData(AMQP::Connection* connection, const char* data, size_t size) override { + if(!_sock.is_open()) + return; + _state->outgoing_queue.emplace_back(data, data+size); + send_some(); + } + + void onError(AMQP::Connection* connection, const char* message) override { + fc_elog(_logger, "AMQP connection to ${s} suffered an error; will retry shortly: ${m}", ("s", _address)("m", message)); + schedule_retry(); + } + + void onClosed(AMQP::Connection *connection) override { + fc_wlog(_logger, "AMQP connection to ${s} closed AMQP connection", ("s", _address)); + schedule_retry(); + } + + void start_connection() { + _resolver.async_resolve(_address.hostname(), std::to_string(_address.port()), boost::asio::bind_executor(_strand, [this](const auto ec, const auto endpoints) { + if(ec) { + if(ec != boost::asio::error::operation_aborted) { + fc_wlog(_logger, "Failed resolving AMQP server ${s}; will retry shortly: ${m}", ("s", _address)("m", ec.message())); + schedule_retry(); + } + return; + } + //AMQP::Connection's dtor will attempt to send a last gasp message. Resetting state here is a little easier to prove + // as being safe as it requires pumping the event loop once vs placing the state reset directly in schedule_retry() + _state.emplace(); + boost::asio::async_connect(_sock, endpoints, boost::asio::bind_executor(_strand, [this](const auto ec, const auto endpoint) { + if(ec) { + if(ec != boost::asio::error::operation_aborted) { + fc_wlog(_logger, "Failed connecting AMQP server ${s}; will retry shortly: ${m}", ("s", _address)("m", ec.message())); + schedule_retry(); + } + return; + } + fc_ilog(_logger, "TCP connection to AMQP server at ${s} is up", ("s", _address)); + receive_some(); + _state->amqp_connection.emplace(this, _address.login(), _address.vhost()); + })); + })); + } + + void schedule_retry() { + _sock.close(); + _resolver.cancel(); + + //calling the failure callback will likely cause downstream users to take action such as closing an AMQP::Channel which + // will attempt to send data. Ensure that _sock is closed before then so onData() will drop those attempts + if(_indicated_ready) + _failed_callback(); + _indicated_ready = false; + + boost::system::error_code ec; + _timer.expires_from_now(std::chrono::seconds(1), ec); + if(ec) + return; + _timer.async_wait(boost::asio::bind_executor(_strand, [this](const auto ec) { + if(ec) + return; + start_connection(); + })); + } + + void send_some() { + if(_state->send_outstanding || _state->outgoing_queue.empty()) + return; + _state->send_outstanding = true; + boost::asio::async_write(_sock, boost::asio::buffer(_state->outgoing_queue.front()), boost::asio::bind_executor(_strand, [this](const auto& ec, size_t wrote) { + if(ec) { + if(ec != boost::asio::error::operation_aborted) { + fc_wlog(_logger, "Failed writing to AMQP server ${s}; connection will retry shortly: ${m}", ("s", _address)("m", ec.message())); + schedule_retry(); + } + return; + } + _state->outgoing_queue.pop_front(); + _state->send_outstanding = false; + send_some(); + })); + } + + void receive_some() { + _sock.async_read_some(boost::asio::buffer(_read_buff), boost::asio::bind_executor(_strand, [this](const auto& ec, size_t sz) { + if(ec) { + if(ec != boost::asio::error::operation_aborted) { + fc_wlog(_logger, "Failed reading from AMQP server ${s}; connection will retry shortly: ${m}", ("s", _address)("m", ec.message())); + schedule_retry(); + } + return; + } + _state->read_queue.insert(_state->read_queue.end(), _read_buff, _read_buff + sz); + auto used = _state->amqp_connection->parse(_state->read_queue.data(), _state->read_queue.size()); + _state->read_queue.erase(_state->read_queue.begin(), _state->read_queue.begin()+used); + + //parse() could have resulted in an error on a channel. In an earlier implementation users could call a function + // which may have caused a socket connection to .close() due to that. This check below may no longer be needed since + // retrying_amqp_connection no longer exposes a way to "bump" a connection + if(_sock.is_open()) + receive_some(); + })); + } + + char _read_buff[64*1024]; + + boost::asio::io_context::strand _strand; + + boost::asio::ip::tcp::resolver _resolver; + boost::asio::ip::tcp::socket _sock; + boost::asio::steady_timer _timer; + + AMQP::Address _address; + + connection_ready_callback_t _ready_callback; + connection_failed_callback_t _failed_callback; + bool _indicated_ready = false; + + fc::logger _logger; + + struct state { + state() {} + + std::deque> outgoing_queue; + bool send_outstanding = false; + + std::vector read_queue; + + std::optional amqp_connection; + }; + std::optional _state; + //be aware that AMQP::Connection's dtor will attempt to send a last gasp message on dtor. This means _state's + // destruction will cause onData() to be called when _state's amqp_connection dtor is fired. So be mindful of member + // dtor ordering here as _state & _sock will be accessed during dtor +}; + + +struct single_channel_retrying_amqp_connection::impl { + using channel_ready_callback_t = single_channel_retrying_amqp_connection::channel_ready_callback_t; + using failed_callback_t = single_channel_retrying_amqp_connection::failed_callback_t; + + impl(boost::asio::io_context& io_context, const AMQP::Address& address, channel_ready_callback_t ready, + failed_callback_t failed, fc::logger logger) : + _connection(io_context, address, [this](AMQP::Connection* c){conn_ready(c);},[this](){conn_failed();}, logger), + _timer(_connection.strand()), _channel_ready(ready), _failed(failed) + {} + + void conn_ready(AMQP::Connection* c) { + _amqp_connection = c; + bring_up_channel(); + } + + void start_retry() { + boost::system::error_code ec; + _timer.expires_from_now(std::chrono::seconds(1), ec); + if(ec) + return; + _timer.async_wait(boost::asio::bind_executor(_connection.strand(), [this](const auto ec) { + if(ec) + return; + bring_up_channel(); + })); + } + + void set_channel_on_error() { + _amqp_channel->onError([this](const char* e) { + wlog("AMQP channel failure on AMQP connection ${c}; retrying : ${m}", ("c", _connection.address())("m", e)); + _failed(); + start_retry(); + }); + } + + void bring_up_channel() { + try { + _amqp_channel.emplace(_amqp_connection); + } + catch(...) { + wlog("AMQP channel could not start for AMQP connection ${c}; retrying", ("c", _connection.address())); + start_retry(); + } + set_channel_on_error(); + _amqp_channel->onReady([this]() { + _channel_ready(&*_amqp_channel); + //in case someone tried to set their own onError()... + set_channel_on_error(); + }); + } + + void conn_failed() { + _amqp_connection = nullptr; + _amqp_channel.reset(); + boost::system::error_code ec; + _timer.cancel(ec); + _failed(); + } + + retrying_amqp_connection _connection; + boost::asio::steady_timer _timer; + std::optional _amqp_channel; + AMQP::Connection* _amqp_connection = nullptr; + + channel_ready_callback_t _channel_ready; + failed_callback_t _failed; +}; + +retrying_amqp_connection::retrying_amqp_connection(boost::asio::io_context& io_context, const AMQP::Address& address, connection_ready_callback_t ready, + connection_failed_callback_t failed, fc::logger logger) : + my(io_context, address, ready, failed, logger) {} + + +const AMQP::Address& retrying_amqp_connection::address() const { + return my->_address; +} + +boost::asio::io_context::strand& retrying_amqp_connection::strand() { + return my->_strand; +} + +retrying_amqp_connection::~retrying_amqp_connection() = default; + +single_channel_retrying_amqp_connection::single_channel_retrying_amqp_connection(boost::asio::io_context& io_context, const AMQP::Address& address, channel_ready_callback_t ready, + failed_callback_t failed, fc::logger logger) : + my(io_context, address, ready, failed, logger) {} + +const AMQP::Address& single_channel_retrying_amqp_connection::address() const { + return my->_connection.address(); +} + +single_channel_retrying_amqp_connection::~single_channel_retrying_amqp_connection() = default; + +} namespace fc { void to_variant(const AMQP::Address& a, fc::variant& v) { From 0c6e8586e6d96df898042fdb19bbb1880ef839c2 Mon Sep 17 00:00:00 2001 From: Matt Witherspoon <32485495+spoonincode@users.noreply.github.com> Date: Thu, 16 Jul 2020 11:49:51 -0400 Subject: [PATCH 5/9] prevent potential double stack of start_connection() --- .../retrying_amqp_connection.cpp | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/libraries/retrying_amqp_connection/retrying_amqp_connection.cpp b/libraries/retrying_amqp_connection/retrying_amqp_connection.cpp index fe2f370b599..b754b6b2f85 100644 --- a/libraries/retrying_amqp_connection/retrying_amqp_connection.cpp +++ b/libraries/retrying_amqp_connection/retrying_amqp_connection.cpp @@ -68,6 +68,15 @@ struct retrying_amqp_connection::impl : public AMQP::ConnectionHandler { } void schedule_retry() { + //in some cases, such as an async_read & async_write both outstanding at the same time during socket failure, + // schedule_retry() can be called multiple times in quick succession. nominally this causes an already closed _sock + // to be closed(), and cancels the pending 1 second timer when restarting the 1 second timer. In theory though, if thread + // timing is particularly slow, the one second timer may have already expired (and the callback can no longer be cancelled) + // which could potentially queue up two start_connection()s. + //Bail out early if a pending timer is already running and the callback hasn't been called. + if(_retry_scheduled) + return; + _sock.close(); _resolver.cancel(); @@ -81,7 +90,9 @@ struct retrying_amqp_connection::impl : public AMQP::ConnectionHandler { _timer.expires_from_now(std::chrono::seconds(1), ec); if(ec) return; + _retry_scheduled = true; _timer.async_wait(boost::asio::bind_executor(_strand, [this](const auto ec) { + _retry_scheduled = false; if(ec) return; start_connection(); @@ -140,6 +151,7 @@ struct retrying_amqp_connection::impl : public AMQP::ConnectionHandler { connection_ready_callback_t _ready_callback; connection_failed_callback_t _failed_callback; bool _indicated_ready = false; + bool _retry_scheduled = false; fc::logger _logger; From a2c612cb5d26d241f3e37d7d9ae791cc9be203e0 Mon Sep 17 00:00:00 2001 From: Matt Witherspoon <32485495+spoonincode@users.noreply.github.com> Date: Thu, 16 Jul 2020 12:09:44 -0400 Subject: [PATCH 6/9] comment update --- .../retrying_amqp_connection/retrying_amqp_connection.cpp | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/libraries/retrying_amqp_connection/retrying_amqp_connection.cpp b/libraries/retrying_amqp_connection/retrying_amqp_connection.cpp index b754b6b2f85..c8994e9ae20 100644 --- a/libraries/retrying_amqp_connection/retrying_amqp_connection.cpp +++ b/libraries/retrying_amqp_connection/retrying_amqp_connection.cpp @@ -130,9 +130,10 @@ struct retrying_amqp_connection::impl : public AMQP::ConnectionHandler { auto used = _state->amqp_connection->parse(_state->read_queue.data(), _state->read_queue.size()); _state->read_queue.erase(_state->read_queue.begin(), _state->read_queue.begin()+used); - //parse() could have resulted in an error on a channel. In an earlier implementation users could call a function - // which may have caused a socket connection to .close() due to that. This check below may no longer be needed since - // retrying_amqp_connection no longer exposes a way to "bump" a connection + //parse() could have resulted in an error on an AMQP channel or on the AMQP connection (causing a onError() or + // onClosed() to be called). An error on an AMQP channel is outside the scope of retrying_amqp_connection, but an + // onError() or onClosed() would call schedule_retry() and thus _sock.close(). Check that the socket is still open before + // looping back around for another async_read if(_sock.is_open()) receive_some(); })); From 5d2ebb94bd937c63c8e206719ed24bf1b10b60dd Mon Sep 17 00:00:00 2001 From: Matt Witherspoon <32485495+spoonincode@users.noreply.github.com> Date: Thu, 16 Jul 2020 12:13:01 -0400 Subject: [PATCH 7/9] remove the paranoia around Channel's onError() (just don't do that) --- .../retrying_amqp_connection.hpp | 2 +- .../retrying_amqp_connection.cpp | 16 +++++----------- 2 files changed, 6 insertions(+), 12 deletions(-) diff --git a/libraries/retrying_amqp_connection/include/eosio/retrying_amqp_connection/retrying_amqp_connection.hpp b/libraries/retrying_amqp_connection/include/eosio/retrying_amqp_connection/retrying_amqp_connection.hpp index 05f73f554aa..62f783a43ff 100644 --- a/libraries/retrying_amqp_connection/include/eosio/retrying_amqp_connection/retrying_amqp_connection.hpp +++ b/libraries/retrying_amqp_connection/include/eosio/retrying_amqp_connection/retrying_amqp_connection.hpp @@ -44,7 +44,7 @@ struct single_channel_retrying_amqp_connection { /// \param io_context a strand is created on this io_context for all asio operatoins /// \param address AMQP address to connect to - /// \param ready a callback when the AMQP channel has been established + /// \param ready a callback when the AMQP channel has been established, do NOT set the .onError() for the passed AMQP::Channel /// \param failed a callback when the AMQP channel has failed after being established; should no longer use the AMQP::Channel* within or after this callback /// \param logger logger to send logging to single_channel_retrying_amqp_connection(boost::asio::io_context& io_context, const AMQP::Address& address, channel_ready_callback_t ready, diff --git a/libraries/retrying_amqp_connection/retrying_amqp_connection.cpp b/libraries/retrying_amqp_connection/retrying_amqp_connection.cpp index c8994e9ae20..ab7d908a4a7 100644 --- a/libraries/retrying_amqp_connection/retrying_amqp_connection.cpp +++ b/libraries/retrying_amqp_connection/retrying_amqp_connection.cpp @@ -200,14 +200,6 @@ struct single_channel_retrying_amqp_connection::impl { })); } - void set_channel_on_error() { - _amqp_channel->onError([this](const char* e) { - wlog("AMQP channel failure on AMQP connection ${c}; retrying : ${m}", ("c", _connection.address())("m", e)); - _failed(); - start_retry(); - }); - } - void bring_up_channel() { try { _amqp_channel.emplace(_amqp_connection); @@ -216,11 +208,13 @@ struct single_channel_retrying_amqp_connection::impl { wlog("AMQP channel could not start for AMQP connection ${c}; retrying", ("c", _connection.address())); start_retry(); } - set_channel_on_error(); + _amqp_channel->onError([this](const char* e) { + wlog("AMQP channel failure on AMQP connection ${c}; retrying : ${m}", ("c", _connection.address())("m", e)); + _failed(); + start_retry(); + }); _amqp_channel->onReady([this]() { _channel_ready(&*_amqp_channel); - //in case someone tried to set their own onError()... - set_channel_on_error(); }); } From 9798628ef1b6e56456fbe12d30289052807c5689 Mon Sep 17 00:00:00 2001 From: Matt Witherspoon <32485495+spoonincode@users.noreply.github.com> Date: Thu, 16 Jul 2020 22:46:19 -0400 Subject: [PATCH 8/9] just use a unique_ptr here instead of fc::fwd --- .../retrying_amqp_connection/retrying_amqp_connection.hpp | 6 ++---- .../retrying_amqp_connection/retrying_amqp_connection.cpp | 5 ++--- 2 files changed, 4 insertions(+), 7 deletions(-) diff --git a/libraries/retrying_amqp_connection/include/eosio/retrying_amqp_connection/retrying_amqp_connection.hpp b/libraries/retrying_amqp_connection/include/eosio/retrying_amqp_connection/retrying_amqp_connection.hpp index 62f783a43ff..039e6a47bbb 100644 --- a/libraries/retrying_amqp_connection/include/eosio/retrying_amqp_connection/retrying_amqp_connection.hpp +++ b/libraries/retrying_amqp_connection/include/eosio/retrying_amqp_connection/retrying_amqp_connection.hpp @@ -34,8 +34,7 @@ struct retrying_amqp_connection { private: struct impl; - constexpr static size_t fwd_size = 66424; - fc::fwd my; + std::unique_ptr my; }; struct single_channel_retrying_amqp_connection { @@ -56,8 +55,7 @@ struct single_channel_retrying_amqp_connection { private: struct impl; - constexpr static size_t fwd_size = 66608; - fc::fwd my; + std::unique_ptr my; }; } diff --git a/libraries/retrying_amqp_connection/retrying_amqp_connection.cpp b/libraries/retrying_amqp_connection/retrying_amqp_connection.cpp index ab7d908a4a7..8de1263ea8c 100644 --- a/libraries/retrying_amqp_connection/retrying_amqp_connection.cpp +++ b/libraries/retrying_amqp_connection/retrying_amqp_connection.cpp @@ -1,6 +1,5 @@ #include #include -#include #include @@ -237,7 +236,7 @@ struct single_channel_retrying_amqp_connection::impl { retrying_amqp_connection::retrying_amqp_connection(boost::asio::io_context& io_context, const AMQP::Address& address, connection_ready_callback_t ready, connection_failed_callback_t failed, fc::logger logger) : - my(io_context, address, ready, failed, logger) {} + my(new impl(io_context, address, ready, failed, logger)) {} const AMQP::Address& retrying_amqp_connection::address() const { @@ -252,7 +251,7 @@ retrying_amqp_connection::~retrying_amqp_connection() = default; single_channel_retrying_amqp_connection::single_channel_retrying_amqp_connection(boost::asio::io_context& io_context, const AMQP::Address& address, channel_ready_callback_t ready, failed_callback_t failed, fc::logger logger) : - my(io_context, address, ready, failed, logger) {} + my(new impl(io_context, address, ready, failed, logger)) {} const AMQP::Address& single_channel_retrying_amqp_connection::address() const { return my->_connection.address(); From e2d15cd9c74d13824a12ee3447b5f5a0c92449a6 Mon Sep 17 00:00:00 2001 From: Matt Witherspoon <32485495+spoonincode@users.noreply.github.com> Date: Thu, 16 Jul 2020 22:46:46 -0400 Subject: [PATCH 9/9] Revert "support for fc::fwd with 5 ctor args" This reverts commit 1c401e1f47836cfa6683e4eb85c6ab6b091e023c. --- libraries/fc/include/fc/fwd.hpp | 1 - libraries/fc/include/fc/fwd_impl.hpp | 7 +------ 2 files changed, 1 insertion(+), 7 deletions(-) diff --git a/libraries/fc/include/fc/fwd.hpp b/libraries/fc/include/fc/fwd.hpp index 1a5a01cf050..ccf0828e58b 100644 --- a/libraries/fc/include/fc/fwd.hpp +++ b/libraries/fc/include/fc/fwd.hpp @@ -13,7 +13,6 @@ class fwd { template fwd( U&& u ); template fwd( U&& u, V&& v ); template fwd( U&& u, V&& v, X&&, Y&& ); - template fwd( U&& u, V&& v, X&&, Y&&, Z&& ); fwd(); fwd( const fwd& f ); diff --git a/libraries/fc/include/fc/fwd_impl.hpp b/libraries/fc/include/fc/fwd_impl.hpp index db1432621f0..639085abe81 100644 --- a/libraries/fc/include/fc/fwd_impl.hpp +++ b/libraries/fc/include/fc/fwd_impl.hpp @@ -76,12 +76,7 @@ namespace fc { check_size(); new (this) T( fc::forward(u), fc::forward(v), fc::forward(x), fc::forward(y) ); } - template - template - fwd::fwd( U&& u, V&& v, X&& x, Y&& y, Z&& z ) { - check_size(); - new (this) T( fc::forward(u), fc::forward(v), fc::forward(x), fc::forward(y), fc::forward(z) ); - } + template fwd::fwd() {