From b6e49c821d883c181b9f7f4e4053af203faf0f27 Mon Sep 17 00:00:00 2001 From: Aleksandr Lyapunov Date: Sun, 28 Aug 2022 15:33:39 +0300 Subject: [PATCH] Client: refactor network provider Make it work with connections directly instead of descriptors. Part of #28 --- src/Client/Connection.hpp | 17 +++++- src/Client/Connector.hpp | 10 +--- src/Client/EpollNetProvider.hpp | 94 +++++++++------------------------ src/Client/LibevNetProvider.hpp | 5 +- 4 files changed, 45 insertions(+), 81 deletions(-) diff --git a/src/Client/Connection.hpp b/src/Client/Connection.hpp index e9aa5de1a..1233b08a8 100644 --- a/src/Client/Connection.hpp +++ b/src/Client/Connection.hpp @@ -66,9 +66,10 @@ struct ConnectionImpl ConnectionImpl& operator = (const ConnectionImpl& impl) = delete; ~ConnectionImpl(); +public: void ref(); void unref(); -public: + Connector &connector; BUFFER inBuf; BUFFER outBuf; @@ -101,7 +102,8 @@ ConnectionImpl::~ConnectionImpl() { assert(refs == 0); if (socket >= 0) { - connector.close(socket); + Connection conn(this); + connector.close(conn); socket = -1; } } @@ -130,12 +132,16 @@ class Connection public: class Space; Space space; + using Impl_t = ConnectionImpl; Connection(Connector &connector); + Connection(Impl_t *a); ~Connection(); Connection(const Connection& connection); Connection& operator = (const Connection& connection); + Impl_t *getImpl() { return impl; } + //Required for storing Connections in hash tables (std::unordered_map) friend bool operator == (const Connection& lhs, const Connection& rhs) @@ -325,6 +331,13 @@ Connection::Connection(Connector &conn impl->ref(); } +template +Connection::Connection(ConnectionImpl *a) : + space(*this), impl(a) +{ + impl->ref(); +} + template Connection::Connection(const Connection& connection) : space(*this), impl(connection.impl) diff --git a/src/Client/Connector.hpp b/src/Client/Connector.hpp index 7842e64ca..47f621c2f 100644 --- a/src/Client/Connector.hpp +++ b/src/Client/Connector.hpp @@ -74,7 +74,6 @@ class Connector void finishSend(const Connection &conn); std::set> m_ReadyToSend; - void close(int socket); void close(Connection &conn); private: //Timeout of Connector::connect() method. @@ -109,19 +108,12 @@ Connector::connect(Connection &conn, return 0; } -template -void -Connector::close(int socket) -{ - m_NetProvider.close(socket); -} - template void Connector::close(Connection &conn) { assert(conn.getSocket() >= 0); - m_NetProvider.close(conn.getSocket()); + m_NetProvider.close(conn); conn.setSocket(-1); } diff --git a/src/Client/EpollNetProvider.hpp b/src/Client/EpollNetProvider.hpp index d37c3659c..a0f986dbe 100644 --- a/src/Client/EpollNetProvider.hpp +++ b/src/Client/EpollNetProvider.hpp @@ -56,15 +56,13 @@ class EpollNetProvider { ~EpollNetProvider(); int connect(Conn_t &conn, const std::string_view& addr, unsigned port, size_t timeout); - void close(int socket); + void close(Conn_t &conn); /** Read and write to sockets; polling using epoll. */ int wait(int timeout); bool check(Conn_t &conn); private: static constexpr size_t DEFAULT_TIMEOUT = 100; - static constexpr size_t EVENT_POLL_COUNT_MAX = 64; - static constexpr size_t EPOLL_QUEUE_LEN = 1024; static constexpr size_t EPOLL_EVENTS_MAX = 128; //return 0 if all data from buffer was processed (sent or read); @@ -73,13 +71,10 @@ class EpollNetProvider { int send(Conn_t &conn); int recv(Conn_t &conn); - int poll(struct ConnectionEvent *fds, size_t *fd_count, - int timeout = DEFAULT_TIMEOUT); - void setPollSetting(int socket, int setting); - void registerEpoll(int socket); + void setPollSetting(Conn_t &conn, int setting); + void registerEpoll(Conn_t &conn); /** map. Contains both ready to read/send connections */ - std::map m_Connections; Connector_t &m_Connector; int m_EpollFd; }; @@ -88,7 +83,7 @@ template EpollNetProvider::EpollNetProvider(Connector_t &connector) : m_Connector(connector) { - m_EpollFd = epoll_create(EPOLL_QUEUE_LEN); + m_EpollFd = epoll_create1(EPOLL_CLOEXEC); if (m_EpollFd == -1) { LOG_ERROR("Failed to initialize epoll: ", strerror(errno)); abort(); @@ -99,22 +94,20 @@ template EpollNetProvider::~EpollNetProvider() { ::close(m_EpollFd); - m_EpollFd = 0; - - for (auto conn = m_Connections.begin(); conn != m_Connections.end();) - conn = m_Connections.erase(conn); + m_EpollFd = -1; } template void -EpollNetProvider::registerEpoll(int socket) +EpollNetProvider::registerEpoll(Conn_t &conn) { /* Configure epoll with new socket. */ assert(m_EpollFd >= 0); struct epoll_event event; event.events = EPOLLIN; - event.data.fd = socket; - if (epoll_ctl(m_EpollFd, EPOLL_CTL_ADD, socket, &event) != 0) { + event.data.ptr = conn.getImpl(); + conn.getImpl()->ref(); + if (epoll_ctl(m_EpollFd, EPOLL_CTL_ADD, conn.getSocket(), &event) != 0) { LOG_ERROR("Failed to add socket to epoll: " "epoll_ctl() returned with errno: ", strerror(errno)); @@ -124,11 +117,11 @@ EpollNetProvider::registerEpoll(int socket) template void -EpollNetProvider::setPollSetting(int socket, int setting) { +EpollNetProvider::setPollSetting(Conn_t &conn, int setting) { struct epoll_event event; event.events = setting; - event.data.fd = socket; - if (epoll_ctl(m_EpollFd, EPOLL_CTL_MOD, socket, &event) != 0) { + event.data.ptr = conn.getImpl(); + if (epoll_ctl(m_EpollFd, EPOLL_CTL_MOD, conn.getSocket(), &event) != 0) { LOG_ERROR("Failed to change epoll mode: " "epoll_ctl() returned with errno: ", strerror(errno)); @@ -173,16 +166,16 @@ EpollNetProvider::connect(Conn_t &conn, LOG_DEBUG("Greetings are decoded"); LOG_DEBUG("Authentication processing..."); //TODO: add authentication step. - registerEpoll(socket); conn.setSocket(socket); - m_Connections.insert({socket, conn}); + registerEpoll(conn); return 0; } template void -EpollNetProvider::close(int socket) +EpollNetProvider::close(Conn_t& conn) { + int socket = conn.getSocket(); assert(socket >= 0); #ifndef NDEBUG struct sockaddr sa; @@ -201,46 +194,15 @@ EpollNetProvider::close(int socket) " corresponding to address ", addr); } #endif + conn.getImpl()->unref(); NETWORK::close(socket); - struct epoll_event event; - event.events = EPOLLIN; - event.data.fd = socket; /* * Descriptor is automatically removed from epoll handler * when all descriptors are closed. So in case * there's other descriptors on open socket, invoke * epoll_ctl manually. */ - epoll_ctl(m_EpollFd, EPOLL_CTL_DEL, socket, &event); - //close can be called during epoll provider destruction. In this case - //all connections staying alive only due to the presence in m_Connections - //map. While cleaning up m_Connections destructors of connections will be - //called. So to avoid double-free presence check in m_Connections is required. - if (m_Connections.find(socket) != m_Connections.end()) { - assert(m_Connections.find(socket)->second.getSocket() == socket); - m_Connections.erase(socket); - } -} - -template -int -EpollNetProvider::poll(struct ConnectionEvent *fds, - size_t *fd_count, int timeout) -{ - static struct epoll_event events[EPOLL_EVENTS_MAX]; - *fd_count = 0; - int event_cnt = epoll_wait(m_EpollFd, events, EPOLL_EVENTS_MAX, - timeout); - if (event_cnt == -1) - return -1; - assert(event_cnt >= 0); - for (int i = 0; i < event_cnt; ++i) { - fds[*fd_count].sock = events[i].data.fd; - fds[*fd_count].event = events[i].events; - (*fd_count)++; - } - assert(*fd_count == (size_t) event_cnt); - return 0; + epoll_ctl(m_EpollFd, EPOLL_CTL_DEL, socket, nullptr); } template @@ -286,7 +248,7 @@ EpollNetProvider::send(Conn_t &conn) hasSentBytes(conn, sent_bytes); if (rc != 0) { if (netWouldBlock(errno)) { - setPollSetting(conn.getSocket(), EPOLLIN | EPOLLOUT); + setPollSetting(conn, EPOLLIN | EPOLLOUT); return 1; } conn.setError(std::string("Failed to send request: ") + @@ -316,19 +278,17 @@ EpollNetProvider::wait(int timeout) } /* Firstly poll connections to point out if there's data to read. */ - static struct ConnectionEvent events[EVENT_POLL_COUNT_MAX]; - size_t event_cnt = 0; - if (poll((ConnectionEvent *)&events, &event_cnt, timeout) != 0) { + struct epoll_event events[EPOLL_EVENTS_MAX]; + int event_cnt = epoll_wait(m_EpollFd, events, EPOLL_EVENTS_MAX, timeout); + if (event_cnt < 0) { //Poll error doesn't belong to any connection so just global //log it. LOG_ERROR("Poll failed: ", strerror(errno)); return -1; } - for (size_t i = 0; i < event_cnt; ++i) { - assert(m_Connections.find(events[i].sock) != m_Connections.end()); - if ((events[i].event & EPOLLIN) != 0) { - Conn_t conn = m_Connections.find(events[i].sock)->second; - assert(conn.getSocket() == events[i].sock); + for (int i = 0; i < event_cnt; ++i) { + Conn_t conn((typename Conn_t::Impl_t *)events[i].data.ptr); + if ((events[i].events & EPOLLIN) != 0) { LOG_DEBUG("Registered poll event ", i, ": ", conn.getSocket(), " socket is ready to read"); /* @@ -342,9 +302,7 @@ EpollNetProvider::wait(int timeout) m_Connector.readyToDecode(conn); } - if ((events[i].event & EPOLLOUT) != 0) { - Conn_t conn = m_Connections.find(events[i].sock)->second; - assert(conn.getSocket() == events[i].sock); + if ((events[i].events & EPOLLOUT) != 0) { LOG_DEBUG("Registered poll event ", i, ": ", conn.getSocket(), " socket is ready to write"); int rc = send(conn); @@ -353,7 +311,7 @@ EpollNetProvider::wait(int timeout) /* All data from connection has been successfully written. */ if (rc == 0) { m_Connector.finishSend(conn); - setPollSetting(conn.getSocket(), EPOLLIN); + setPollSetting(conn, EPOLLIN); } } } diff --git a/src/Client/LibevNetProvider.hpp b/src/Client/LibevNetProvider.hpp index 0ce76e37a..4646959ec 100644 --- a/src/Client/LibevNetProvider.hpp +++ b/src/Client/LibevNetProvider.hpp @@ -88,7 +88,7 @@ class LibevNetProvider { LibevNetProvider(Connector_t &connector, struct ev_loop *loop = nullptr); int connect(Conn_t &conn, const std::string_view& addr, unsigned port, size_t timeout); - void close(int socket); + void close(Conn_t &conn); int wait(int timeout); bool check(Conn_t &conn); @@ -316,8 +316,9 @@ LibevNetProvider::connect(Conn_t &conn, template void -LibevNetProvider::close(int socket) +LibevNetProvider::close(Conn_t &conn) { + int socket = conn.getSocket(); NETWORK::close(socket); //close can be called during libev provider destruction. In this case //all connections staying alive only due to the presence in m_Watchers