Skip to content

Commit

Permalink
Add custom I/O executor support to I/O objects.
Browse files Browse the repository at this point in the history
All I/O objects now have an additional Executor template parameter. This
template parameter defaults to the asio::executor type (the polymorphic
executor wrapper) but can be used to specify a user-defined executor
type.

I/O objects' constructors and functions that previously took an
asio::io_context& now accept either an Executor or a reference to a
concrete ExecutionContext (such as asio::io_context or
asio::thread_pool).

One potential point of breakage in existing user code is when reusing an
I/O object's io_context for constructing another I/O object, as in:

    asio::steady_timer my_timer(my_socket.get_executor().context());

To fix this, either construct the second I/O object using the first I/O
object's executor:

    asio::steady_timer my_timer(my_socket.get_executor());

or otherwise explicitly pass the io_context:

    asio::steady_timer my_timer(my_io_context);
  • Loading branch information
chriskohlhoff committed Feb 18, 2019
1 parent a72fbb0 commit 59066d8
Show file tree
Hide file tree
Showing 163 changed files with 8,245 additions and 5,159 deletions.
2 changes: 1 addition & 1 deletion example/cpp03/http/server2/server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ server::server(const std::string& address, const std::string& port,
signals_.async_wait(boost::bind(&server::handle_stop, this));

// Open the acceptor with the option to reuse the address (i.e. SO_REUSEADDR).
boost::asio::ip::tcp::resolver resolver(acceptor_.get_executor().context());
boost::asio::ip::tcp::resolver resolver(acceptor_.get_executor());
boost::asio::ip::tcp::endpoint endpoint =
*resolver.resolve(address, port).begin();
acceptor_.open(endpoint.protocol());
Expand Down
26 changes: 11 additions & 15 deletions example/cpp03/http/server3/connection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ namespace server3 {
connection::connection(boost::asio::io_context& io_context,
request_handler& handler)
: strand_(io_context),
socket_(io_context),
socket_(strand_),
request_handler_(handler)
{
}
Expand All @@ -32,10 +32,9 @@ boost::asio::ip::tcp::socket& connection::socket()
void connection::start()
{
socket_.async_read_some(boost::asio::buffer(buffer_),
boost::asio::bind_executor(strand_,
boost::bind(&connection::handle_read, shared_from_this(),
boost::asio::placeholders::error,
boost::asio::placeholders::bytes_transferred)));
boost::bind(&connection::handle_read, shared_from_this(),
boost::asio::placeholders::error,
boost::asio::placeholders::bytes_transferred));
}

void connection::handle_read(const boost::system::error_code& e,
Expand All @@ -51,25 +50,22 @@ void connection::handle_read(const boost::system::error_code& e,
{
request_handler_.handle_request(request_, reply_);
boost::asio::async_write(socket_, reply_.to_buffers(),
boost::asio::bind_executor(strand_,
boost::bind(&connection::handle_write, shared_from_this(),
boost::asio::placeholders::error)));
boost::bind(&connection::handle_write, shared_from_this(),
boost::asio::placeholders::error));
}
else if (!result)
{
reply_ = reply::stock_reply(reply::bad_request);
boost::asio::async_write(socket_, reply_.to_buffers(),
boost::asio::bind_executor(strand_,
boost::bind(&connection::handle_write, shared_from_this(),
boost::asio::placeholders::error)));
boost::bind(&connection::handle_write, shared_from_this(),
boost::asio::placeholders::error));
}
else
{
socket_.async_read_some(boost::asio::buffer(buffer_),
boost::asio::bind_executor(strand_,
boost::bind(&connection::handle_read, shared_from_this(),
boost::asio::placeholders::error,
boost::asio::placeholders::bytes_transferred)));
boost::bind(&connection::handle_read, shared_from_this(),
boost::asio::placeholders::error,
boost::asio::placeholders::bytes_transferred));
}
}

Expand Down
2 changes: 1 addition & 1 deletion example/cpp03/http/server4/server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ void server::operator()(boost::system::error_code ec, std::size_t length)
do
{
// Create a new socket for the next incoming connection.
socket_.reset(new tcp::socket(acceptor_->get_executor().context()));
socket_.reset(new tcp::socket(acceptor_->get_executor()));

// Accept a new connection. The "yield" pseudo-keyword saves the current
// line number and exits the coroutine's "reenter" block. We use the
Expand Down
10 changes: 5 additions & 5 deletions example/cpp03/nonblocking/third_party_lib.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -83,9 +83,9 @@ class connection
public:
typedef boost::shared_ptr<connection> pointer;

static pointer create(boost::asio::io_context& io_context)
static pointer create(const boost::asio::executor& ex)
{
return pointer(new connection(io_context));
return pointer(new connection(ex));
}

tcp::socket& socket()
Expand All @@ -102,8 +102,8 @@ class connection
}

private:
connection(boost::asio::io_context& io_context)
: socket_(io_context),
connection(const boost::asio::executor& ex)
: socket_(ex),
session_impl_(socket_),
read_in_progress_(false),
write_in_progress_(false)
Expand Down Expand Up @@ -193,7 +193,7 @@ class server
void start_accept()
{
connection::pointer new_connection =
connection::create(acceptor_.get_executor().context());
connection::create(acceptor_.get_executor());

acceptor_.async_accept(new_connection->socket(),
boost::bind(&server::handle_accept, this, new_connection,
Expand Down
8 changes: 3 additions & 5 deletions example/cpp03/porthopper/server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,7 @@ class server
next_frame_number_(1)
{
// Start waiting for a new control connection.
tcp_socket_ptr new_socket(
new tcp::socket(acceptor_.get_executor().context()));
tcp_socket_ptr new_socket(new tcp::socket(acceptor_.get_executor()));
acceptor_.async_accept(*new_socket,
boost::bind(&server::handle_accept, this,
boost::asio::placeholders::error, new_socket));
Expand All @@ -60,8 +59,7 @@ class server
}

// Start waiting for a new control connection.
tcp_socket_ptr new_socket(
new tcp::socket(acceptor_.get_executor().context()));
tcp_socket_ptr new_socket(new tcp::socket(acceptor_.get_executor()));
acceptor_.async_accept(*new_socket,
boost::bind(&server::handle_accept, this,
boost::asio::placeholders::error, new_socket));
Expand All @@ -75,7 +73,7 @@ class server
{
// Delay handling of the control request to simulate network latency.
timer_ptr delay_timer(
new boost::asio::steady_timer(acceptor_.get_executor().context()));
new boost::asio::steady_timer(acceptor_.get_executor()));
delay_timer->expires_after(boost::asio::chrono::seconds(2));
delay_timer->async_wait(
boost::bind(&server::handle_control_request_timer, this,
Expand Down
6 changes: 3 additions & 3 deletions example/cpp03/services/basic_logger.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,13 @@ class basic_logger
/**
* This constructor creates a logger.
*
* @param io_context The io_context object used to locate the logger service.
* @param context The execution context used to locate the logger service.
*
* @param identifier An identifier for this logger.
*/
explicit basic_logger(boost::asio::io_context& io_context,
explicit basic_logger(boost::asio::execution_context& context,
const std::string& identifier)
: service_(boost::asio::use_service<Service>(io_context)),
: service_(boost::asio::use_service<Service>(context)),
impl_(service_.null())
{
service_.create(impl_, identifier);
Expand Down
6 changes: 0 additions & 6 deletions example/cpp03/services/logger_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,3 @@
//

#include "logger_service.hpp"

namespace services {

boost::asio::io_context::id logger_service::id;

} // namespace services
12 changes: 6 additions & 6 deletions example/cpp03/services/logger_service.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,11 @@ namespace services {

/// Service implementation for the logger.
class logger_service
: public boost::asio::io_context::service
: public boost::asio::execution_context::service
{
public:
/// The unique service identifier.
static boost::asio::io_context::id id;
/// The type used to identify this service in the execution context.
typedef logger_service key_type;

/// The backend implementation of a logger.
struct logger_impl
Expand All @@ -42,8 +42,8 @@ class logger_service
typedef logger_impl* impl_type;

/// Constructor creates a thread to run a private io_context.
logger_service(boost::asio::io_context& io_context)
: boost::asio::io_context::service(io_context),
logger_service(boost::asio::execution_context& context)
: boost::asio::execution_context::service(context),
work_io_context_(),
work_(boost::asio::make_work_guard(work_io_context_)),
work_thread_(new boost::thread(
Expand All @@ -62,7 +62,7 @@ class logger_service
}

/// Destroy all user-defined handler objects owned by the service.
void shutdown_service()
void shutdown()
{
}

Expand Down
12 changes: 8 additions & 4 deletions example/cpp03/timeouts/blocking_token_tcp_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,17 @@

using boost::asio::ip::tcp;

// We will use our sockets only with an io_context.
typedef boost::asio::basic_stream_socket<tcp,
boost::asio::io_context::executor_type> tcp_socket;

//----------------------------------------------------------------------

// A custom completion token that makes asynchronous operations behave as
// though they are blocking calls with a timeout.
struct close_after
{
close_after(boost::asio::chrono::steady_clock::duration t, tcp::socket& s)
close_after(boost::asio::chrono::steady_clock::duration t, tcp_socket& s)
: timeout_(t), socket_(s)
{
}
Expand All @@ -37,7 +41,7 @@ struct close_after
boost::asio::chrono::steady_clock::duration timeout_;

// The socket to be closed if the operation does not complete in time.
tcp::socket& socket_;
tcp_socket& socket_;
};

namespace boost {
Expand Down Expand Up @@ -125,7 +129,7 @@ class async_result<close_after, void(boost::system::error_code, T)>

private:
boost::asio::chrono::steady_clock::duration timeout_;
tcp::socket& socket_;
tcp_socket& socket_;
boost::system::error_code ec_;
T t_;
};
Expand All @@ -151,7 +155,7 @@ int main(int argc, char* argv[])
tcp::resolver::results_type endpoints =
tcp::resolver(io_context).resolve(argv[1], argv[2]);

tcp::socket socket(io_context);
tcp_socket socket(io_context);

// Run an asynchronous connect operation with a timeout.
boost::asio::async_connect(socket, endpoints,
Expand Down
6 changes: 4 additions & 2 deletions example/cpp03/tutorial/daytime3/server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,8 @@ class tcp_server
{
public:
tcp_server(boost::asio::io_context& io_context)
: acceptor_(io_context, tcp::endpoint(tcp::v4(), 13))
: io_context_(io_context),
acceptor_(io_context, tcp::endpoint(tcp::v4(), 13))
{
start_accept();
}
Expand All @@ -79,7 +80,7 @@ class tcp_server
void start_accept()
{
tcp_connection::pointer new_connection =
tcp_connection::create(acceptor_.get_executor().context());
tcp_connection::create(io_context_);

acceptor_.async_accept(new_connection->socket(),
boost::bind(&tcp_server::handle_accept, this, new_connection,
Expand All @@ -97,6 +98,7 @@ class tcp_server
start_accept();
}

boost::asio::io_context& io_context_;
tcp::acceptor acceptor_;
};

Expand Down
6 changes: 4 additions & 2 deletions example/cpp03/tutorial/daytime7/server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,8 @@ class tcp_server
{
public:
tcp_server(boost::asio::io_context& io_context)
: acceptor_(io_context, tcp::endpoint(tcp::v4(), 13))
: io_context_(io_context),
acceptor_(io_context, tcp::endpoint(tcp::v4(), 13))
{
start_accept();
}
Expand All @@ -78,7 +79,7 @@ class tcp_server
void start_accept()
{
tcp_connection::pointer new_connection =
tcp_connection::create(acceptor_.get_executor().context());
tcp_connection::create(io_context_);

acceptor_.async_accept(new_connection->socket(),
boost::bind(&tcp_server::handle_accept, this, new_connection,
Expand All @@ -96,6 +97,7 @@ class tcp_server
start_accept();
}

boost::asio::io_context& io_context_;
tcp::acceptor acceptor_;
};

Expand Down
16 changes: 11 additions & 5 deletions example/cpp03/windows/transmit_file.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,15 @@ using boost::asio::ip::tcp;
using boost::asio::windows::overlapped_ptr;
using boost::asio::windows::random_access_handle;

typedef boost::asio::basic_stream_socket<tcp,
boost::asio::io_context::executor_type> tcp_socket;

typedef boost::asio::basic_socket_acceptor<tcp,
boost::asio::io_context::executor_type> tcp_acceptor;

// A wrapper for the TransmitFile overlapped I/O operation.
template <typename Handler>
void transmit_file(tcp::socket& socket,
void transmit_file(tcp_socket& socket,
random_access_handle& file, Handler handler)
{
// Construct an OVERLAPPED-derived object to contain the handler.
Expand Down Expand Up @@ -65,7 +71,7 @@ class connection
return pointer(new connection(io_context, filename));
}

tcp::socket& socket()
tcp_socket& socket()
{
return socket_;
}
Expand Down Expand Up @@ -96,10 +102,10 @@ class connection
size_t /*bytes_transferred*/)
{
boost::system::error_code ignored_ec;
socket_.shutdown(tcp::socket::shutdown_both, ignored_ec);
socket_.shutdown(tcp_socket::shutdown_both, ignored_ec);
}

tcp::socket socket_;
tcp_socket socket_;
std::string filename_;
random_access_handle file_;
};
Expand Down Expand Up @@ -137,7 +143,7 @@ class server
start_accept();
}

tcp::acceptor acceptor_;
tcp_acceptor acceptor_;
std::string filename_;
};

Expand Down
2 changes: 1 addition & 1 deletion example/cpp11/invocation/prioritised_handlers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

using boost::asio::ip::tcp;

class handler_priority_queue : boost::asio::execution_context
class handler_priority_queue : public boost::asio::execution_context
{
public:
template <typename Function>
Expand Down
2 changes: 1 addition & 1 deletion example/cpp11/operations/composed_5.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ auto async_write_messages(tcp::socket& socket,

// Create a steady_timer to be used for the delay between messages.
std::unique_ptr<boost::asio::steady_timer> delay_timer(
new boost::asio::steady_timer(socket.get_executor().context()));
new boost::asio::steady_timer(socket.get_executor()));

// Initiate the underlying operations by explicitly calling our intermediate
// completion handler's function call operator.
Expand Down
Loading

10 comments on commit 59066d8

@jbeich
Copy link

@jbeich jbeich commented on 59066d8 Mar 7, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A number of consumers no longer build. Can you help analyzing? It looks like the issue is within Boost.

@djarek
Copy link

@djarek djarek commented on 59066d8 Mar 7, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

io_object.get_executor() no longer returns io_context::executor_type, the default executor is now asio::executor (a polymorphic executor). In order to get old behavior you need to use different type, e.g. for sockets you need asio::basic_stream_socket<asio::ip::tcp, asio::io_context::executor_type>. Alternatively just don't use get_executor().context() and just pass get_executor() to the constructors.

@chriskohlhoff
Copy link
Collaborator Author

@chriskohlhoff chriskohlhoff commented on 59066d8 Mar 7, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Several of these errors are due to this line:

https://github.com/zaphoyd/websocketpp/blob/master/websocketpp/transport/asio/security/none.hpp#L172

where it is passing a constructor argument through make_shared in a reference_wrapper. This shouldn't be required as make_shared will perfectly forward its arguments.

EDIT: I've noted this on an existing websocket issue for this problem: zaphoyd/websocketpp#794 (comment)

@apolukhin
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

where it is passing a constructor argument through make_shared in a reference_wrapper. This shouldn't be required as make_shared will perfectly forward its arguments.

That won't work with boost::make_shared and pre-C++11 compilers. On those compilers make_shared forwards parameters by const reference:

In file included from /home/travis/boost-local/boost/asio/io_context.hpp:23:0,
                 from /home/travis/boost-local/boost/asio/io_service.hpp:18,
                 from ../tasks_processor_base/tasks_processor_base.hpp:11,
                 from tasks_processor_timers.hpp:8,
                 from main.cpp:4:
/home/travis/boost-local/boost/asio/execution_context.hpp: In instantiation of ‘typename boost::detail::sp_if_not_array<T>::type boost::make_shared(const A1&, const A2&) [with T = boost::asio::basic_deadline_timer<boost::posix_time::ptime>; A1 = boost::asio::io_context; A2 = boost::posix_time::time_duration; typename boost::detail::sp_if_not_array<T>::type = boost::shared_ptr<boost::asio::basic_deadline_timer<boost::posix_time::ptime> >]’:
tasks_processor_timers.hpp:35:13:   required from ‘detail::timer_task<Functor>::timer_task(boost::asio::io_service&, const Time&, const Functor&) [with Time = boost::posix_time::time_duration; Functor = boost::_bi::bind_t<void, void (*)(int&), boost::_bi::list1<boost::reference_wrapper<int> > >; boost::asio::io_service = boost::asio::io_context]’
tasks_processor_timers.hpp:59:73:   required from ‘detail::timer_task<Functor> detail::make_timer_task(boost::asio::io_service&, const Time&, const Functor&) [with Time = boost::posix_time::time_duration; Functor = boost::_bi::bind_t<void, void (*)(int&), boost::_bi::list1<boost::reference_wrapper<int> > >; boost::asio::io_service = boost::asio::io_context]’
tasks_processor_timers.hpp:73:50:   required from ‘void tp_timers::tasks_processor::run_after(tp_timers::tasks_processor::duration_type, const Functor&) [with Functor = boost::_bi::bind_t<void, void (*)(int&), boost::_bi::list1<boost::reference_wrapper<int> > >; tp_timers::tasks_processor::duration_type = boost::posix_time::time_duration]’
main.cpp:34:5:   required from here
/home/travis/boost-local/boost/asio/detail/noncopyable.hpp:32:3: error: ‘boost::asio::detail::noncopyable::noncopyable(const boost::asio::detail::noncopyable&)’ is private
   noncopyable(const noncopyable&);
   ^
In file included from /home/travis/boost-local/boost/asio/detail/scheduler.hpp:21:0,
                 from /home/travis/boost-local/boost/asio/system_context.hpp:19,
                 from /home/travis/boost-local/boost/asio/impl/system_executor.hpp:22,
                 from /home/travis/boost-local/boost/asio/system_executor.hpp:129,
                 from /home/travis/boost-local/boost/asio/associated_executor.hpp:21,
                 from /home/travis/boost-local/boost/asio/detail/bind_handler.hpp:20,
                 from /home/travis/boost-local/boost/asio/detail/wrapped_handler.hpp:18,
                 from /home/travis/boost-local/boost/asio/io_context.hpp:24,
                 from /home/travis/boost-local/boost/asio/io_service.hpp:18,
                 from ../tasks_processor_base/tasks_processor_base.hpp:11,
                 from tasks_processor_timers.hpp:8,
                 from main.cpp:4:
/home/travis/boost-local/boost/asio/execution_context.hpp:106:7: error: within this context
 class execution_context
       ^
In file included from /home/travis/boost-local/boost/asio/io_service.hpp:18:0,
                 from ../tasks_processor_base/tasks_processor_base.hpp:11,
                 from tasks_processor_timers.hpp:8,
                 from main.cpp:4:
/home/travis/boost-local/boost/asio/io_context.hpp:179:7: note: synthesized method ‘boost::asio::execution_context::execution_context(const boost::asio::execution_context&)’ first required here 
 class io_context
       ^
In file included from /home/travis/boost-local/boost/smart_ptr/make_shared.hpp:14:0,
                 from /home/travis/boost-local/boost/make_shared.hpp:14,
                 from tasks_processor_timers.hpp:13,
                 from main.cpp:4:
/home/travis/boost-local/boost/smart_ptr/make_shared_object.hpp:396:5: note: synthesized method ‘boost::asio::io_context::io_context(const boost::asio::io_context&)’ first required here 
     ::new( pv ) T(
     ^
In file included from /home/travis/boost-local/boost/asio/executor.hpp:338:0,
                 from /home/travis/boost-local/boost/asio/basic_deadline_timer.hpp:31,
                 from /home/travis/boost-local/boost/asio/deadline_timer.hpp:24,
                 from tasks_processor_timers.hpp:11,
                 from main.cpp:4:
/home/travis/boost-local/boost/asio/impl/executor.hpp:332:1: error:   initializing argument 1 of ‘boost::asio::executor::executor(Executor) [with Executor = boost::asio::io_context]’
 executor::executor(Executor e)
 ^
make[2]: *** [main.o] Error 1

Is there a known workaround for the old compilers?

@chriskohlhoff
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a known workaround for the old compilers?

If you only need to support the forthcoming boost release, then:

boost::make_shared<tcp::socket>(my_io_context.get_executor());

If you want to support both old and new boost versions then obviously you could just use new directly and construct a shared_ptr from that, but to continue using make_shared you could try something like:

template <typename T>                                                             
struct helper : T                                                     
{                                                                                 
  template <typename U> helper(U* u) : T(*u) {}                       
};

boost::shared_ptr<tcp::socket> s = boost::make_shared<helper<tcp::socket> >(&my_io_context);

@SoapGentoo
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@chriskohlhoff is the workaround suggested in https://bugs.freebsd.org/bugzilla/show_bug.cgi?id=236601 namely something along the lines of

#if BOOST_VERSION >= 107000
#define GET_IO_SERVICE(s) ((boost::asio::io_context&)(s).get_executor().context())
#else
#define GET_IO_SERVICE(s) ((s).get_io_service())
#endif

an acceptable workaround for old and new boost versions?

@djarek
Copy link

@djarek djarek commented on 59066d8 Apr 15, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(boost::asio::io_context&)(s).get_executor().context() this cast is unsafe, do you actually need the io_context&? 1.70 added constructors to sockets and other I/O objects that allow constructing directly from an executor. So if this macro is to be used to construct a socket, it should be:

#if BOOST_VERSION >= 107000
#define GET_IO_SERVICE(s) ((s).get_executor())
#else
#define GET_IO_SERVICE(s) ((s).get_io_service())
#endif

@SoapGentoo
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@djarek Thanks for the quick reply. Could you help me develop a simple, short fix for something like nghttp2/nghttp2#1320 that is dispatched on BOOST_VERSION? We're getting a lot of io_context related failures with boost 1.70, and I'm looking for a simple canonical fix.

@djarek
Copy link

@djarek djarek commented on 59066d8 Apr 16, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@SoapGentoo that macro should work just fine, you can wrap it into a function if you don't like spilling macros everywhere.

@Vultraz
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The line that caused us (Wesnoth) problems was

socket_ptr socket = std::make_shared<boost::asio::ip::tcp::socket>(std::ref(io_service_));

We resolved the issue by removing the use of std::ref (it's redundant anyway).

Please sign in to comment.