Skip to content
This repository has been archived by the owner on Jun 23, 2022. It is now read-only.

rpc: fix asio bug that causes coredump #263

Merged
merged 1 commit into from
Jun 24, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions src/core/core/network.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -334,7 +334,6 @@ rpc_session::rpc_session(connection_oriented_network &net,
_message_count(0),
_is_sending_next(false),
_message_sent(0),

_net(net),
_remote_addr(remote_addr),
_max_buffer_block_count_per_send(net.max_buffer_block_count_per_send()),
Expand Down Expand Up @@ -724,4 +723,4 @@ void connection_oriented_network::on_client_session_disconnected(rpc_session_ptr
scount);
}
}
}
} // namespace dsn
35 changes: 17 additions & 18 deletions src/core/tools/common/asio_rpc_session.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,6 @@
* THE SOFTWARE.
*/

/*
* Description:
* What is this file about?
*
* Revision history:
* xxxx-xx-xx, author, first version
* xxxx-xx-xx, author, fix bug about xxx
*/

#include "asio_rpc_session.h"

namespace dsn {
Expand All @@ -42,6 +33,8 @@ asio_rpc_session::~asio_rpc_session() {}

void asio_rpc_session::set_options()
{
utils::auto_write_lock socket_guard(_socket_lock);

if (_socket->is_open()) {
boost::system::error_code ec;
boost::asio::socket_base::send_buffer_size option, option2(16 * 1024 * 1024);
Expand Down Expand Up @@ -92,6 +85,8 @@ void asio_rpc_session::do_read(int read_next)
void *ptr = _reader.read_buffer_ptr(read_next);
int remaining = _reader.read_buffer_capacity();

utils::auto_read_lock socket_guard(_socket_lock);

_socket->async_read_some(
boost::asio::buffer(ptr, remaining),
[this](boost::system::error_code ec, std::size_t length) {
Expand Down Expand Up @@ -136,20 +131,22 @@ void asio_rpc_session::do_read(int read_next)
});
}

void asio_rpc_session::write(uint64_t signature)
void asio_rpc_session::send(uint64_t signature)
{
std::vector<boost::asio::const_buffer> buffers2;
std::vector<boost::asio::const_buffer> asio_wbufs;
int bcount = (int)_sending_buffers.size();

// prepare buffers
buffers2.resize(bcount);
asio_wbufs.resize(bcount);
for (int i = 0; i < bcount; i++) {
buffers2[i] = boost::asio::const_buffer(_sending_buffers[i].buf, _sending_buffers[i].sz);
asio_wbufs[i] = boost::asio::const_buffer(_sending_buffers[i].buf, _sending_buffers[i].sz);
}

add_ref();

utils::auto_read_lock socket_guard(_socket_lock);
boost::asio::async_write(
*_socket, buffers2, [this, signature](boost::system::error_code ec, std::size_t length) {
*_socket, asio_wbufs, [this, signature](boost::system::error_code ec, std::size_t length) {
if (!!ec) {
derror(
"asio write to %s failed: %s", _remote_addr.to_string(), ec.message().c_str());
Expand All @@ -175,12 +172,14 @@ asio_rpc_session::asio_rpc_session(asio_network_provider &net,
void asio_rpc_session::on_failure(bool is_write)
{
if (on_disconnected(is_write)) {
safe_close();
close();
}
}

void asio_rpc_session::safe_close()
void asio_rpc_session::close()
{
utils::auto_write_lock socket_guard(_socket_lock);

boost::system::error_code ec;
_socket->shutdown(boost::asio::socket_base::shutdown_type::shutdown_both, ec);
if (ec)
Expand Down Expand Up @@ -215,5 +214,5 @@ void asio_rpc_session::connect()
});
}
}
}
}
} // namespace tools
} // namespace dsn
34 changes: 15 additions & 19 deletions src/core/tools/common/asio_rpc_session.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,6 @@
* THE SOFTWARE.
*/

/*
* Description:
* What is this file about?
*
* Revision history:
* xxxx-xx-xx, author, first version
* xxxx-xx-xx, author, fix bug about xxx
*/

#pragma once

#include <dsn/tool-api/rpc_message.h>
Expand All @@ -44,6 +35,7 @@
namespace dsn {
namespace tools {

// Thread-safe
class asio_rpc_session : public rpc_session
{
public:
Expand All @@ -52,16 +44,17 @@ class asio_rpc_session : public rpc_session
std::shared_ptr<boost::asio::ip::tcp::socket> &socket,
message_parser_ptr &parser,
bool is_client);
virtual ~asio_rpc_session();
virtual void send(uint64_t signature) override { return write(signature); }
virtual void close() override { safe_close(); }

public:
virtual void connect() override;
~asio_rpc_session() override;

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

加这么多空行干什么?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这个应该是 convention? public functions 之间加 new line,留白给以后加注释。

void send(uint64_t signature) override;

void close() override;

void connect() override;

private:
virtual void do_read(int read_next) override;
void write(uint64_t signature);
void do_read(int read_next) override;
void on_failure(bool is_write = false);
void set_options();
void on_message_read(message_ex *msg)
Expand All @@ -70,10 +63,13 @@ class asio_rpc_session : public rpc_session
on_failure(false);
}
}
void safe_close();

private:
// boost::asio::socket is thread-unsafe, must use lock to prevent a
// reading/writing socket being modified or closed concurrently.
std::shared_ptr<boost::asio::ip::tcp::socket> _socket;
::dsn::utils::rw_lock_nr _socket_lock;
};
}
}

} // namespace tools
} // namespace dsn