This repository has been archived by the owner on Aug 2, 2022. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 3.6k
retrying_amqp_connection: commonize more amqp connection management stuff #9294
Merged
Merged
Changes from all commits
Commits
Show all changes
9 commits
Select commit
Hold shift + click to select a range
e947b9c
retrying_amqp_connection
spoonincode 50ab2c9
tweak retry_connection() ordering based on recent tests
spoonincode 1c401e1
support for fc::fwd with 5 ctor args
spoonincode 65407a5
use a strand for all operations
spoonincode 0c6e858
prevent potential double stack of start_connection()
spoonincode a2c612c
comment update
spoonincode 5d2ebb9
remove the paranoia around Channel's onError() (just don't do that)
spoonincode 9798628
just use a unique_ptr here instead of fc::fwd
spoonincode e2d15cd
Revert "support for fc::fwd with 5 ctor args"
spoonincode File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,3 @@ | ||
add_library(retrying_amqp_connection retrying_amqp_connection.cpp) | ||
target_include_directories(retrying_amqp_connection PUBLIC include) | ||
target_link_libraries(retrying_amqp_connection fc amqpcpp) |
65 changes: 65 additions & 0 deletions
65
...rying_amqp_connection/include/eosio/retrying_amqp_connection/retrying_amqp_connection.hpp
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,65 @@ | ||
#pragma once | ||
|
||
#include <amqpcpp.h> | ||
|
||
#include <boost/asio.hpp> | ||
|
||
#include <fc/log/logger.hpp> | ||
#include <fc/exception/exception.hpp> | ||
|
||
/* | ||
* A retrying_amqp_connection manages a connection to an AMQP server that will retry the connection | ||
* on failure. Most users should consider single_channel_retrying_amqp_connection instead, which additionally | ||
* manages a single channel. | ||
*/ | ||
|
||
namespace eosio { | ||
struct retrying_amqp_connection { | ||
using connection_ready_callback_t = std::function<void(AMQP::Connection*)>; | ||
using connection_failed_callback_t = std::function<void()>; | ||
|
||
/// \param io_context a strand is created on this io_context for all asio operatoins | ||
/// \param address AMQP address to connect to | ||
/// \param ready a callback when the AMQP connection has been established | ||
/// \param failed a callback when the AMQP connection has failed after being established; should no longer use the AMQP::Connection* after this callback | ||
/// \param logger logger to send logging to | ||
retrying_amqp_connection(boost::asio::io_context& io_context, const AMQP::Address& address, connection_ready_callback_t ready, | ||
connection_failed_callback_t failed, fc::logger logger = fc::logger::get()); | ||
|
||
const AMQP::Address& address() const; | ||
|
||
boost::asio::io_context::strand& strand(); | ||
|
||
~retrying_amqp_connection(); | ||
|
||
private: | ||
struct impl; | ||
std::unique_ptr<impl> my; | ||
}; | ||
|
||
struct single_channel_retrying_amqp_connection { | ||
using channel_ready_callback_t = std::function<void(AMQP::Channel*)>; | ||
using failed_callback_t = std::function<void()>; | ||
|
||
/// \param io_context a strand is created on this io_context for all asio operatoins | ||
/// \param address AMQP address to connect to | ||
/// \param ready a callback when the AMQP channel has been established, do NOT set the .onError() for the passed AMQP::Channel | ||
/// \param failed a callback when the AMQP channel has failed after being established; should no longer use the AMQP::Channel* within or after this callback | ||
/// \param logger logger to send logging to | ||
single_channel_retrying_amqp_connection(boost::asio::io_context& io_context, const AMQP::Address& address, channel_ready_callback_t ready, | ||
failed_callback_t failed, fc::logger logger = fc::logger::get()); | ||
|
||
const AMQP::Address& address() const; | ||
|
||
~single_channel_retrying_amqp_connection(); | ||
|
||
private: | ||
struct impl; | ||
std::unique_ptr<impl> my; | ||
}; | ||
|
||
} | ||
|
||
namespace fc { | ||
void to_variant(const AMQP::Address& a, fc::variant& v); | ||
} |
277 changes: 277 additions & 0 deletions
277
libraries/retrying_amqp_connection/retrying_amqp_connection.cpp
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,277 @@ | ||
#include <amqpcpp.h> | ||
#include <fc/variant.hpp> | ||
|
||
#include <eosio/retrying_amqp_connection/retrying_amqp_connection.hpp> | ||
|
||
namespace eosio { | ||
|
||
struct retrying_amqp_connection::impl : public AMQP::ConnectionHandler { | ||
impl(boost::asio::io_context& io_context, const AMQP::Address& address, connection_ready_callback_t ready, | ||
connection_failed_callback_t failed, fc::logger logger = fc::logger::get()) : | ||
_strand(io_context), _resolver(_strand), _sock(_strand), _timer(_strand), _address(address), | ||
_ready_callback(ready), _failed_callback(failed), _logger(logger) { | ||
|
||
FC_ASSERT(!_address.secure(), "Only amqp:// URIs are supported for AMQP addresses (${a})", ("a", _address)); | ||
start_connection(); | ||
} | ||
|
||
void onReady(AMQP::Connection* connection) override { | ||
fc_ilog(_logger, "AMQP connection to ${s} is fully operational", ("s", _address)); | ||
|
||
_ready_callback(connection); | ||
_indicated_ready = true; | ||
} | ||
|
||
void onData(AMQP::Connection* connection, const char* data, size_t size) override { | ||
if(!_sock.is_open()) | ||
return; | ||
_state->outgoing_queue.emplace_back(data, data+size); | ||
send_some(); | ||
} | ||
|
||
void onError(AMQP::Connection* connection, const char* message) override { | ||
fc_elog(_logger, "AMQP connection to ${s} suffered an error; will retry shortly: ${m}", ("s", _address)("m", message)); | ||
schedule_retry(); | ||
} | ||
|
||
void onClosed(AMQP::Connection *connection) override { | ||
fc_wlog(_logger, "AMQP connection to ${s} closed AMQP connection", ("s", _address)); | ||
schedule_retry(); | ||
} | ||
|
||
void start_connection() { | ||
_resolver.async_resolve(_address.hostname(), std::to_string(_address.port()), boost::asio::bind_executor(_strand, [this](const auto ec, const auto endpoints) { | ||
if(ec) { | ||
if(ec != boost::asio::error::operation_aborted) { | ||
fc_wlog(_logger, "Failed resolving AMQP server ${s}; will retry shortly: ${m}", ("s", _address)("m", ec.message())); | ||
schedule_retry(); | ||
} | ||
return; | ||
} | ||
//AMQP::Connection's dtor will attempt to send a last gasp message. Resetting state here is a little easier to prove | ||
// as being safe as it requires pumping the event loop once vs placing the state reset directly in schedule_retry() | ||
_state.emplace(); | ||
boost::asio::async_connect(_sock, endpoints, boost::asio::bind_executor(_strand, [this](const auto ec, const auto endpoint) { | ||
if(ec) { | ||
if(ec != boost::asio::error::operation_aborted) { | ||
fc_wlog(_logger, "Failed connecting AMQP server ${s}; will retry shortly: ${m}", ("s", _address)("m", ec.message())); | ||
schedule_retry(); | ||
} | ||
return; | ||
} | ||
fc_ilog(_logger, "TCP connection to AMQP server at ${s} is up", ("s", _address)); | ||
receive_some(); | ||
_state->amqp_connection.emplace(this, _address.login(), _address.vhost()); | ||
})); | ||
})); | ||
} | ||
|
||
void schedule_retry() { | ||
//in some cases, such as an async_read & async_write both outstanding at the same time during socket failure, | ||
// schedule_retry() can be called multiple times in quick succession. nominally this causes an already closed _sock | ||
// to be closed(), and cancels the pending 1 second timer when restarting the 1 second timer. In theory though, if thread | ||
// timing is particularly slow, the one second timer may have already expired (and the callback can no longer be cancelled) | ||
// which could potentially queue up two start_connection()s. | ||
//Bail out early if a pending timer is already running and the callback hasn't been called. | ||
if(_retry_scheduled) | ||
return; | ||
|
||
_sock.close(); | ||
_resolver.cancel(); | ||
|
||
//calling the failure callback will likely cause downstream users to take action such as closing an AMQP::Channel which | ||
// will attempt to send data. Ensure that _sock is closed before then so onData() will drop those attempts | ||
if(_indicated_ready) | ||
_failed_callback(); | ||
_indicated_ready = false; | ||
|
||
boost::system::error_code ec; | ||
_timer.expires_from_now(std::chrono::seconds(1), ec); | ||
if(ec) | ||
return; | ||
_retry_scheduled = true; | ||
_timer.async_wait(boost::asio::bind_executor(_strand, [this](const auto ec) { | ||
_retry_scheduled = false; | ||
if(ec) | ||
return; | ||
start_connection(); | ||
})); | ||
} | ||
|
||
void send_some() { | ||
if(_state->send_outstanding || _state->outgoing_queue.empty()) | ||
return; | ||
_state->send_outstanding = true; | ||
boost::asio::async_write(_sock, boost::asio::buffer(_state->outgoing_queue.front()), boost::asio::bind_executor(_strand, [this](const auto& ec, size_t wrote) { | ||
if(ec) { | ||
if(ec != boost::asio::error::operation_aborted) { | ||
fc_wlog(_logger, "Failed writing to AMQP server ${s}; connection will retry shortly: ${m}", ("s", _address)("m", ec.message())); | ||
schedule_retry(); | ||
} | ||
return; | ||
} | ||
_state->outgoing_queue.pop_front(); | ||
_state->send_outstanding = false; | ||
send_some(); | ||
})); | ||
} | ||
|
||
void receive_some() { | ||
_sock.async_read_some(boost::asio::buffer(_read_buff), boost::asio::bind_executor(_strand, [this](const auto& ec, size_t sz) { | ||
if(ec) { | ||
if(ec != boost::asio::error::operation_aborted) { | ||
fc_wlog(_logger, "Failed reading from AMQP server ${s}; connection will retry shortly: ${m}", ("s", _address)("m", ec.message())); | ||
schedule_retry(); | ||
} | ||
return; | ||
} | ||
_state->read_queue.insert(_state->read_queue.end(), _read_buff, _read_buff + sz); | ||
auto used = _state->amqp_connection->parse(_state->read_queue.data(), _state->read_queue.size()); | ||
_state->read_queue.erase(_state->read_queue.begin(), _state->read_queue.begin()+used); | ||
|
||
//parse() could have resulted in an error on an AMQP channel or on the AMQP connection (causing a onError() or | ||
// onClosed() to be called). An error on an AMQP channel is outside the scope of retrying_amqp_connection, but an | ||
// onError() or onClosed() would call schedule_retry() and thus _sock.close(). Check that the socket is still open before | ||
// looping back around for another async_read | ||
if(_sock.is_open()) | ||
receive_some(); | ||
})); | ||
} | ||
|
||
char _read_buff[64*1024]; | ||
|
||
boost::asio::io_context::strand _strand; | ||
|
||
boost::asio::ip::tcp::resolver _resolver; | ||
boost::asio::ip::tcp::socket _sock; | ||
boost::asio::steady_timer _timer; | ||
|
||
AMQP::Address _address; | ||
|
||
connection_ready_callback_t _ready_callback; | ||
connection_failed_callback_t _failed_callback; | ||
bool _indicated_ready = false; | ||
bool _retry_scheduled = false; | ||
|
||
fc::logger _logger; | ||
|
||
struct state { | ||
state() {} | ||
|
||
std::deque<std::vector<char>> outgoing_queue; | ||
bool send_outstanding = false; | ||
|
||
std::vector<char> read_queue; | ||
|
||
std::optional<AMQP::Connection> amqp_connection; | ||
}; | ||
std::optional<state> _state; | ||
//be aware that AMQP::Connection's dtor will attempt to send a last gasp message on dtor. This means _state's | ||
// destruction will cause onData() to be called when _state's amqp_connection dtor is fired. So be mindful of member | ||
// dtor ordering here as _state & _sock will be accessed during dtor | ||
}; | ||
|
||
|
||
struct single_channel_retrying_amqp_connection::impl { | ||
using channel_ready_callback_t = single_channel_retrying_amqp_connection::channel_ready_callback_t; | ||
using failed_callback_t = single_channel_retrying_amqp_connection::failed_callback_t; | ||
|
||
impl(boost::asio::io_context& io_context, const AMQP::Address& address, channel_ready_callback_t ready, | ||
failed_callback_t failed, fc::logger logger) : | ||
_connection(io_context, address, [this](AMQP::Connection* c){conn_ready(c);},[this](){conn_failed();}, logger), | ||
_timer(_connection.strand()), _channel_ready(ready), _failed(failed) | ||
{} | ||
|
||
void conn_ready(AMQP::Connection* c) { | ||
_amqp_connection = c; | ||
bring_up_channel(); | ||
} | ||
|
||
void start_retry() { | ||
boost::system::error_code ec; | ||
_timer.expires_from_now(std::chrono::seconds(1), ec); | ||
if(ec) | ||
return; | ||
_timer.async_wait(boost::asio::bind_executor(_connection.strand(), [this](const auto ec) { | ||
if(ec) | ||
return; | ||
bring_up_channel(); | ||
})); | ||
} | ||
|
||
void bring_up_channel() { | ||
try { | ||
_amqp_channel.emplace(_amqp_connection); | ||
} | ||
catch(...) { | ||
wlog("AMQP channel could not start for AMQP connection ${c}; retrying", ("c", _connection.address())); | ||
start_retry(); | ||
} | ||
_amqp_channel->onError([this](const char* e) { | ||
wlog("AMQP channel failure on AMQP connection ${c}; retrying : ${m}", ("c", _connection.address())("m", e)); | ||
_failed(); | ||
start_retry(); | ||
}); | ||
_amqp_channel->onReady([this]() { | ||
_channel_ready(&*_amqp_channel); | ||
}); | ||
} | ||
|
||
void conn_failed() { | ||
_amqp_connection = nullptr; | ||
_amqp_channel.reset(); | ||
boost::system::error_code ec; | ||
_timer.cancel(ec); | ||
_failed(); | ||
} | ||
|
||
retrying_amqp_connection _connection; | ||
boost::asio::steady_timer _timer; | ||
std::optional<AMQP::Channel> _amqp_channel; | ||
AMQP::Connection* _amqp_connection = nullptr; | ||
|
||
channel_ready_callback_t _channel_ready; | ||
failed_callback_t _failed; | ||
}; | ||
|
||
retrying_amqp_connection::retrying_amqp_connection(boost::asio::io_context& io_context, const AMQP::Address& address, connection_ready_callback_t ready, | ||
connection_failed_callback_t failed, fc::logger logger) : | ||
my(new impl(io_context, address, ready, failed, logger)) {} | ||
|
||
|
||
const AMQP::Address& retrying_amqp_connection::address() const { | ||
return my->_address; | ||
} | ||
|
||
boost::asio::io_context::strand& retrying_amqp_connection::strand() { | ||
return my->_strand; | ||
} | ||
|
||
retrying_amqp_connection::~retrying_amqp_connection() = default; | ||
|
||
single_channel_retrying_amqp_connection::single_channel_retrying_amqp_connection(boost::asio::io_context& io_context, const AMQP::Address& address, channel_ready_callback_t ready, | ||
failed_callback_t failed, fc::logger logger) : | ||
my(new impl(io_context, address, ready, failed, logger)) {} | ||
|
||
const AMQP::Address& single_channel_retrying_amqp_connection::address() const { | ||
return my->_connection.address(); | ||
} | ||
|
||
single_channel_retrying_amqp_connection::~single_channel_retrying_amqp_connection() = default; | ||
|
||
} | ||
|
||
namespace fc { | ||
void to_variant(const AMQP::Address& a, fc::variant& v) { | ||
std::string str(a.secure() ? "amqps://" : "amqp://"); | ||
str.append(a.login().user()).append(":********").append("@"); | ||
str.append(a.hostname().empty() ? "localhost" : a.hostname()); | ||
if(a.port() != 5672) | ||
str.append(":").append(std::to_string(a.port())); | ||
str.append("/"); | ||
if (a.vhost() != "/") | ||
str.append(a.vhost()); | ||
|
||
v = str; | ||
} | ||
} |
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This could be:
v = std::move(str);
asfc::variant
takesstring
by value.