diff --git a/src/core/core/network.cpp b/src/core/core/network.cpp index bb31c2c41f..832a959ba0 100644 --- a/src/core/core/network.cpp +++ b/src/core/core/network.cpp @@ -334,7 +334,6 @@ rpc_session::rpc_session(connection_oriented_network &net, _message_count(0), _is_sending_next(false), _message_sent(0), - _net(net), _remote_addr(remote_addr), _max_buffer_block_count_per_send(net.max_buffer_block_count_per_send()), @@ -724,4 +723,4 @@ void connection_oriented_network::on_client_session_disconnected(rpc_session_ptr scount); } } -} +} // namespace dsn diff --git a/src/core/tools/common/asio_rpc_session.cpp b/src/core/tools/common/asio_rpc_session.cpp index cac400d22e..570d635ddc 100644 --- a/src/core/tools/common/asio_rpc_session.cpp +++ b/src/core/tools/common/asio_rpc_session.cpp @@ -24,15 +24,6 @@ * THE SOFTWARE. */ -/* - * Description: - * What is this file about? - * - * Revision history: - * xxxx-xx-xx, author, first version - * xxxx-xx-xx, author, fix bug about xxx - */ - #include "asio_rpc_session.h" namespace dsn { @@ -42,6 +33,8 @@ asio_rpc_session::~asio_rpc_session() {} void asio_rpc_session::set_options() { + utils::auto_write_lock socket_guard(_socket_lock); + if (_socket->is_open()) { boost::system::error_code ec; boost::asio::socket_base::send_buffer_size option, option2(16 * 1024 * 1024); @@ -92,6 +85,8 @@ void asio_rpc_session::do_read(int read_next) void *ptr = _reader.read_buffer_ptr(read_next); int remaining = _reader.read_buffer_capacity(); + utils::auto_read_lock socket_guard(_socket_lock); + _socket->async_read_some( boost::asio::buffer(ptr, remaining), [this](boost::system::error_code ec, std::size_t length) { @@ -136,20 +131,22 @@ void asio_rpc_session::do_read(int read_next) }); } -void asio_rpc_session::write(uint64_t signature) +void asio_rpc_session::send(uint64_t signature) { - std::vector buffers2; + std::vector asio_wbufs; int bcount = (int)_sending_buffers.size(); // prepare buffers - buffers2.resize(bcount); + asio_wbufs.resize(bcount); for (int i = 0; i < bcount; i++) { - buffers2[i] = boost::asio::const_buffer(_sending_buffers[i].buf, _sending_buffers[i].sz); + asio_wbufs[i] = boost::asio::const_buffer(_sending_buffers[i].buf, _sending_buffers[i].sz); } add_ref(); + + utils::auto_read_lock socket_guard(_socket_lock); boost::asio::async_write( - *_socket, buffers2, [this, signature](boost::system::error_code ec, std::size_t length) { + *_socket, asio_wbufs, [this, signature](boost::system::error_code ec, std::size_t length) { if (!!ec) { derror( "asio write to %s failed: %s", _remote_addr.to_string(), ec.message().c_str()); @@ -175,12 +172,14 @@ asio_rpc_session::asio_rpc_session(asio_network_provider &net, void asio_rpc_session::on_failure(bool is_write) { if (on_disconnected(is_write)) { - safe_close(); + close(); } } -void asio_rpc_session::safe_close() +void asio_rpc_session::close() { + utils::auto_write_lock socket_guard(_socket_lock); + boost::system::error_code ec; _socket->shutdown(boost::asio::socket_base::shutdown_type::shutdown_both, ec); if (ec) @@ -215,5 +214,5 @@ void asio_rpc_session::connect() }); } } -} -} +} // namespace tools +} // namespace dsn diff --git a/src/core/tools/common/asio_rpc_session.h b/src/core/tools/common/asio_rpc_session.h index 8284678a0a..d3457f0a54 100644 --- a/src/core/tools/common/asio_rpc_session.h +++ b/src/core/tools/common/asio_rpc_session.h @@ -24,15 +24,6 @@ * THE SOFTWARE. */ -/* - * Description: - * What is this file about? - * - * Revision history: - * xxxx-xx-xx, author, first version - * xxxx-xx-xx, author, fix bug about xxx - */ - #pragma once #include @@ -44,6 +35,7 @@ namespace dsn { namespace tools { +// Thread-safe class asio_rpc_session : public rpc_session { public: @@ -52,16 +44,17 @@ class asio_rpc_session : public rpc_session std::shared_ptr &socket, message_parser_ptr &parser, bool is_client); - virtual ~asio_rpc_session(); - virtual void send(uint64_t signature) override { return write(signature); } - virtual void close() override { safe_close(); } -public: - virtual void connect() override; + ~asio_rpc_session() override; + + void send(uint64_t signature) override; + + void close() override; + + void connect() override; private: - virtual void do_read(int read_next) override; - void write(uint64_t signature); + void do_read(int read_next) override; void on_failure(bool is_write = false); void set_options(); void on_message_read(message_ex *msg) @@ -70,10 +63,13 @@ class asio_rpc_session : public rpc_session on_failure(false); } } - void safe_close(); private: + // boost::asio::socket is thread-unsafe, must use lock to prevent a + // reading/writing socket being modified or closed concurrently. std::shared_ptr _socket; + ::dsn::utils::rw_lock_nr _socket_lock; }; -} -} + +} // namespace tools +} // namespace dsn