Skip to content
This repository has been archived by the owner on Jun 23, 2022. It is now read-only.

Commit

Permalink
rpc: fix asio bug that causes coredump (#263)
Browse files Browse the repository at this point in the history
  • Loading branch information
Wu Tao authored and neverchanje committed Jun 28, 2019
1 parent d022004 commit 40f3d9b
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 39 deletions.
3 changes: 1 addition & 2 deletions src/core/core/network.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -334,7 +334,6 @@ rpc_session::rpc_session(connection_oriented_network &net,
_message_count(0),
_is_sending_next(false),
_message_sent(0),

_net(net),
_remote_addr(remote_addr),
_max_buffer_block_count_per_send(net.max_buffer_block_count_per_send()),
Expand Down Expand Up @@ -724,4 +723,4 @@ void connection_oriented_network::on_client_session_disconnected(rpc_session_ptr
scount);
}
}
}
} // namespace dsn
35 changes: 17 additions & 18 deletions src/core/tools/common/asio_rpc_session.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,6 @@
* THE SOFTWARE.
*/

/*
* Description:
* What is this file about?
*
* Revision history:
* xxxx-xx-xx, author, first version
* xxxx-xx-xx, author, fix bug about xxx
*/

#include "asio_rpc_session.h"

namespace dsn {
Expand All @@ -42,6 +33,8 @@ asio_rpc_session::~asio_rpc_session() {}

void asio_rpc_session::set_options()
{
utils::auto_write_lock socket_guard(_socket_lock);

if (_socket->is_open()) {
boost::system::error_code ec;
boost::asio::socket_base::send_buffer_size option, option2(16 * 1024 * 1024);
Expand Down Expand Up @@ -92,6 +85,8 @@ void asio_rpc_session::do_read(int read_next)
void *ptr = _reader.read_buffer_ptr(read_next);
int remaining = _reader.read_buffer_capacity();

utils::auto_read_lock socket_guard(_socket_lock);

_socket->async_read_some(
boost::asio::buffer(ptr, remaining),
[this](boost::system::error_code ec, std::size_t length) {
Expand Down Expand Up @@ -136,20 +131,22 @@ void asio_rpc_session::do_read(int read_next)
});
}

void asio_rpc_session::write(uint64_t signature)
void asio_rpc_session::send(uint64_t signature)
{
std::vector<boost::asio::const_buffer> buffers2;
std::vector<boost::asio::const_buffer> asio_wbufs;
int bcount = (int)_sending_buffers.size();

// prepare buffers
buffers2.resize(bcount);
asio_wbufs.resize(bcount);
for (int i = 0; i < bcount; i++) {
buffers2[i] = boost::asio::const_buffer(_sending_buffers[i].buf, _sending_buffers[i].sz);
asio_wbufs[i] = boost::asio::const_buffer(_sending_buffers[i].buf, _sending_buffers[i].sz);
}

add_ref();

utils::auto_read_lock socket_guard(_socket_lock);
boost::asio::async_write(
*_socket, buffers2, [this, signature](boost::system::error_code ec, std::size_t length) {
*_socket, asio_wbufs, [this, signature](boost::system::error_code ec, std::size_t length) {
if (!!ec) {
derror(
"asio write to %s failed: %s", _remote_addr.to_string(), ec.message().c_str());
Expand All @@ -175,12 +172,14 @@ asio_rpc_session::asio_rpc_session(asio_network_provider &net,
void asio_rpc_session::on_failure(bool is_write)
{
if (on_disconnected(is_write)) {
safe_close();
close();
}
}

void asio_rpc_session::safe_close()
void asio_rpc_session::close()
{
utils::auto_write_lock socket_guard(_socket_lock);

boost::system::error_code ec;
_socket->shutdown(boost::asio::socket_base::shutdown_type::shutdown_both, ec);
if (ec)
Expand Down Expand Up @@ -215,5 +214,5 @@ void asio_rpc_session::connect()
});
}
}
}
}
} // namespace tools
} // namespace dsn
34 changes: 15 additions & 19 deletions src/core/tools/common/asio_rpc_session.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,6 @@
* THE SOFTWARE.
*/

/*
* Description:
* What is this file about?
*
* Revision history:
* xxxx-xx-xx, author, first version
* xxxx-xx-xx, author, fix bug about xxx
*/

#pragma once

#include <dsn/tool-api/rpc_message.h>
Expand All @@ -44,6 +35,7 @@
namespace dsn {
namespace tools {

// Thread-safe
class asio_rpc_session : public rpc_session
{
public:
Expand All @@ -52,16 +44,17 @@ class asio_rpc_session : public rpc_session
std::shared_ptr<boost::asio::ip::tcp::socket> &socket,
message_parser_ptr &parser,
bool is_client);
virtual ~asio_rpc_session();
virtual void send(uint64_t signature) override { return write(signature); }
virtual void close() override { safe_close(); }

public:
virtual void connect() override;
~asio_rpc_session() override;

void send(uint64_t signature) override;

void close() override;

void connect() override;

private:
virtual void do_read(int read_next) override;
void write(uint64_t signature);
void do_read(int read_next) override;
void on_failure(bool is_write = false);
void set_options();
void on_message_read(message_ex *msg)
Expand All @@ -70,10 +63,13 @@ class asio_rpc_session : public rpc_session
on_failure(false);
}
}
void safe_close();

private:
// boost::asio::socket is thread-unsafe, must use lock to prevent a
// reading/writing socket being modified or closed concurrently.
std::shared_ptr<boost::asio::ip::tcp::socket> _socket;
::dsn::utils::rw_lock_nr _socket_lock;
};
}
}

} // namespace tools
} // namespace dsn

0 comments on commit 40f3d9b

Please sign in to comment.