Skip to content
This repository has been archived by the owner on Aug 2, 2022. It is now read-only.

Keep http_plugin_impl alive while connection objects are alive #9512

Merged
merged 3 commits into from
Sep 24, 2020
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
119 changes: 68 additions & 51 deletions plugins/http_plugin/http_plugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -179,11 +179,20 @@ namespace eosio {
#endif
using websocket_server_tls_type = websocketpp::server<detail::asio_with_stub_log<websocketpp::transport::asio::tls_socket::endpoint>>;
using ssl_context_ptr = websocketpp::lib::shared_ptr<websocketpp::lib::asio::ssl::context>;
using http_plugin_impl_ptr = std::shared_ptr<class http_plugin_impl>;

static bool verbose_http_errors = false;

class http_plugin_impl {
class http_plugin_impl : public std::enable_shared_from_this<http_plugin_impl> {
public:
http_plugin_impl() = default;

http_plugin_impl(const http_plugin_impl&) = delete;
http_plugin_impl(http_plugin_impl&&) = delete;

http_plugin_impl& operator=(const http_plugin_impl&) = delete;
http_plugin_impl& operator=(http_plugin_impl&&) = delete;

// key -> priority, url_handler
map<string,detail::internal_url_handler> url_handlers;
std::optional<tcp::endpoint> listen_endpoint;
Expand Down Expand Up @@ -215,7 +224,7 @@ namespace eosio {
websocket_local_server_type unix_server;
#endif

bool validate_host;
bool validate_host = true;
set<string> valid_hosts;

bool host_port_is_valid( const std::string& header_host_port, const string& endpoint_local_host_port ) {
Expand Down Expand Up @@ -375,29 +384,30 @@ namespace eosio {
*/
template<typename T>
struct abstract_conn_impl : public detail::abstract_conn {
abstract_conn_impl(detail::connection_ptr<T> conn, http_plugin_impl& impl)
abstract_conn_impl(detail::connection_ptr<T> conn, http_plugin_impl_ptr impl)
:_conn(std::move(conn))
,_impl(impl)
,_impl(std::move(impl))
{
_impl.requests_in_flight += 1;
_impl->requests_in_flight += 1;
}

~abstract_conn_impl() {
_impl.requests_in_flight -= 1;
_impl->requests_in_flight -= 1;
}

// No copy constructor and no move
abstract_conn_impl(abstract_conn_impl&) = delete;
abstract_conn_impl(const abstract_conn_impl&) = delete;
abstract_conn_impl(abstract_conn_impl&&) = delete;
abstract_conn_impl& operator=(const abstract_conn_impl&) = delete;

abstract_conn_impl& operator=(abstract_conn_impl&&) noexcept = default;

bool verify_max_bytes_in_flight() override {
return _impl.verify_max_bytes_in_flight(_conn);
return _impl->verify_max_bytes_in_flight(_conn);
}

bool verify_max_requests_in_flight() override {
return _impl.verify_max_requests_in_flight(_conn);
return _impl->verify_max_requests_in_flight(_conn);
}

void handle_exception()override {
Expand All @@ -411,19 +421,19 @@ namespace eosio {
}

detail::connection_ptr<T> _conn;
http_plugin_impl &_impl;
http_plugin_impl_ptr _impl;
};

/**
* Helper to construct an abstract_conn_impl for a given connection and instance of http_plugin_impl
* @tparam T - The downstream parameter for the connection_ptr
* @param conn - existing connection_ptr<T>
* @param impl - reference to the ownint http_plugin_impl
* @param impl - the owning http_plugin_impl
* @return abstract_conn_ptr backed by type specific implementations of the methods
*/
template<typename T>
static detail::abstract_conn_ptr make_abstract_conn_ptr( detail::connection_ptr<T> conn, http_plugin_impl& impl ) {
return std::make_shared<abstract_conn_impl<T>>(conn, impl);
static detail::abstract_conn_ptr make_abstract_conn_ptr( detail::connection_ptr<T> conn, http_plugin_impl_ptr impl ) {
return std::make_shared<abstract_conn_impl<T>>(std::move(conn), std::move(impl));
}

/**
Expand All @@ -434,66 +444,66 @@ namespace eosio {
*/
template<typename T>
struct in_flight {
in_flight(T&& object, http_plugin_impl& impl)
:object(std::move(object))
,impl(impl)
in_flight(T&& object, http_plugin_impl_ptr impl)
:_object(std::move(object))
,_impl(std::move(impl))
{
count = detail::in_flight_sizeof(this->object);
impl.bytes_in_flight += count;
_count = detail::in_flight_sizeof(_object);
_impl->bytes_in_flight += _count;
}

~in_flight() {
if (count) {
impl.bytes_in_flight -= count;
if (_count) {
_impl->bytes_in_flight -= _count;
}
}

// No copy constructor, but allow move
in_flight(const in_flight&) = delete;
in_flight(in_flight&& from)
:object(std::move(from.object))
,count(from.count)
,impl(from.impl)
:_object(std::move(from._object))
,_count(from._count)
,_impl(std::move(from._impl))
{
from.count = 0;
from._count = 0;
}

// No copy assignment, but allow move
in_flight& operator=(const in_flight&) = delete;
in_flight& operator=(in_flight&& from) {
object = std::move(from.object);
count = from.count;
impl = from.impl;
from.count = 0;
_object = std::move(from._object);
_count = from._count;
_impl = std::move(from._impl);
from._count = 0;
}

/**
* const accessor
* @return const reference to the contained object
*/
const T& operator* () const {
return object;
return _object;
}

/**
* mutable accessor (can be moved frmo)
* @return mutable reference to the contained object
*/
T& operator* () {
return object;
return _object;
}

T object;
size_t count;
http_plugin_impl& impl;
T _object;
size_t _count;
http_plugin_impl_ptr _impl;
};

/**
* convenient wrapper to make an in_flight<T>
*/
template<typename T>
static auto make_in_flight(T&& object, http_plugin_impl& impl) {
return std::make_shared<in_flight<T>>(std::forward<T>(object), impl);
static auto make_in_flight(T&& object, http_plugin_impl_ptr impl) {
return std::make_shared<in_flight<T>>(std::forward<T>(object), std::move(impl));
}

/**
Expand All @@ -503,18 +513,19 @@ namespace eosio {
* @pre b.size() has been added to bytes_in_flight by caller
* @param priority - priority to post to the app thread at
* @param next - the next handler for responses
* @param my - the http_plugin_impl
* @return the constructed internal_url_handler
*/
detail::internal_url_handler make_app_thread_url_handler( int priority, url_handler next ) {
static detail::internal_url_handler make_app_thread_url_handler( int priority, url_handler next, http_plugin_impl_ptr my ) {
auto next_ptr = std::make_shared<url_handler>(std::move(next));
return [this, priority, next_ptr=std::move(next_ptr)]( detail::abstract_conn_ptr conn, string r, string b, url_response_callback then ) mutable {
auto tracked_b = make_in_flight<string>(std::move(b), *this);
return [my=std::move(my), priority, next_ptr=std::move(next_ptr)]
( detail::abstract_conn_ptr conn, string r, string b, url_response_callback then ) {
auto tracked_b = make_in_flight<string>(std::move(b), my);
if (!conn->verify_max_bytes_in_flight()) {
return;
}

url_response_callback wrapped_then = [tracked_b, then=std::move(then)](int code, fc::variant resp)
{
url_response_callback wrapped_then = [tracked_b, then=std::move(then)](int code, fc::variant resp) {
then(code, std::move(resp));
};

Expand All @@ -538,7 +549,7 @@ namespace eosio {
* @param next - the next handler for responses
* @return the constructed internal_url_handler
*/
detail::internal_url_handler make_http_thread_url_handler(url_handler next) {
static detail::internal_url_handler make_http_thread_url_handler(url_handler next) {
return [next=std::move(next)]( detail::abstract_conn_ptr conn, string r, string b, url_response_callback then ) {
try {
next(std::move(r), std::move(b), std::move(then));
Expand All @@ -556,18 +567,19 @@ namespace eosio {
* @return lambda suitable for url_response_callback
*/
template<typename T>
auto make_http_response_handler( detail::abstract_conn_ptr abstract_conn_ptr ) {
return [this, abstract_conn_ptr]( int code, fc::variant response ) {
auto tracked_response = make_in_flight(std::move(response), *this);
auto make_http_response_handler( detail::abstract_conn_ptr abstract_conn_ptr) {
return [my=shared_from_this(), abstract_conn_ptr]( int code, fc::variant response ) {
auto tracked_response = make_in_flight(std::move(response), my);
if (!abstract_conn_ptr->verify_max_bytes_in_flight()) {
return;
}

// post back to an HTTP thread to to allow the response handler to be called from any thread
boost::asio::post( thread_pool->get_executor(), [this, abstract_conn_ptr, code, tracked_response=std::move(tracked_response)]() {
boost::asio::post( my->thread_pool->get_executor(),
[my, abstract_conn_ptr, code, tracked_response=std::move(tracked_response)]() {
try {
std::string json = fc::json::to_string( *(*tracked_response), fc::time_point::now() + max_response_time );
auto tracked_json = make_in_flight(std::move(json), *this);
std::string json = fc::json::to_string( *(*tracked_response), fc::time_point::now() + my->max_response_time );
auto tracked_json = make_in_flight(std::move(json), my);
abstract_conn_ptr->send_response(std::move(*(*tracked_json)), code);
} catch( ... ) {
abstract_conn_ptr->handle_exception();
Expand Down Expand Up @@ -605,7 +617,7 @@ namespace eosio {
con->append_header( "Content-type", "application/json" );
con->defer_http_response();

auto abstract_conn_ptr = make_abstract_conn_ptr<T>(con, *this);
auto abstract_conn_ptr = make_abstract_conn_ptr<T>(con, shared_from_this());
if( !verify_max_bytes_in_flight( con ) || !verify_max_requests_in_flight( con ) ) return;

std::string resource = con->get_uri()->get_resource();
Expand Down Expand Up @@ -633,7 +645,7 @@ namespace eosio {
ws.init_asio( &thread_pool->get_executor() );
ws.set_reuse_addr(true);
ws.set_max_http_body_size(max_body_size);
// capture server_ioc shared_ptr in http handler to keep it alive while in use
// captures `this` & ws, my needs to live as long as server is handling requests
ws.set_http_handler([&](connection_hdl hdl) {
handle_http_request<detail::asio_with_stub_log<T>>(ws.get_con_from_hdl(hdl));
});
Expand Down Expand Up @@ -863,7 +875,8 @@ namespace eosio {
my->unix_server.init_asio( &my->thread_pool->get_executor() );
my->unix_server.set_max_http_body_size(my->max_body_size);
my->unix_server.listen(*my->unix_endpoint);
my->unix_server.set_http_handler([&, &ioc = my->thread_pool->get_executor()](connection_hdl hdl) {
// captures `this`, my needs to live as long as unix_server is handling requests
my->unix_server.set_http_handler([this](connection_hdl hdl) {
my->handle_http_request<detail::asio_local_with_stub_log>( my->unix_server.get_con_from_hdl(hdl));
});
my->unix_server.start_accept();
Expand Down Expand Up @@ -936,14 +949,18 @@ namespace eosio {

if( my->thread_pool ) {
my->thread_pool->stop();
my->thread_pool.reset();
}

// release http_plugin_impl_ptr shared_ptrs captured in url handlers
my->url_handlers.clear();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This isn't quite enough. The thread pool's io_context may contain unexecuted handlers that hold shared_ptrs to the http_plugin_impl.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not following. The io_context destructor will destroy all the unexecuted handlers that hold shared_ptrs to http_plugin_impl.
https://www.boost.org/doc/libs/develop/doc/html/boost_asio/reference/io_context/_io_context.html

Do you just mean the comment is not clear? Or do you see an issue here?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're only stopping the thread pool, not resetting it. This leaves a shared_ptr cycle.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah. Thanks!


app().post( 0, [me = my](){} ); // keep my pointer alive until queue is drained
}

void http_plugin::add_handler(const string& url, const url_handler& handler, int priority) {
fc_ilog( logger, "add api url: ${c}", ("c", url) );
my->url_handlers[url] = my->make_app_thread_url_handler(priority, handler);
my->url_handlers[url] = my->make_app_thread_url_handler(priority, handler, my);
}

void http_plugin::add_async_handler(const string& url, const url_handler& handler) {
Expand Down