Skip to content

Commit

Permalink
Client: refactor network provider
Browse files Browse the repository at this point in the history
Make it work with connections directly instead of descriptors.

Part of #28
  • Loading branch information
alyapunov committed Aug 31, 2022
1 parent 1e33f8b commit b6e49c8
Show file tree
Hide file tree
Showing 4 changed files with 45 additions and 81 deletions.
17 changes: 15 additions & 2 deletions src/Client/Connection.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,10 @@ struct ConnectionImpl
ConnectionImpl& operator = (const ConnectionImpl& impl) = delete;
~ConnectionImpl();

public:
void ref();
void unref();
public:

Connector<BUFFER, NetProvider> &connector;
BUFFER inBuf;
BUFFER outBuf;
Expand Down Expand Up @@ -101,7 +102,8 @@ ConnectionImpl<BUFFER, NetProvider>::~ConnectionImpl()
{
assert(refs == 0);
if (socket >= 0) {
connector.close(socket);
Connection<BUFFER, NetProvider> conn(this);
connector.close(conn);
socket = -1;
}
}
Expand Down Expand Up @@ -130,12 +132,16 @@ class Connection
public:
class Space;
Space space;
using Impl_t = ConnectionImpl<BUFFER, NetProvider>;

Connection(Connector<BUFFER, NetProvider> &connector);
Connection(Impl_t *a);
~Connection();
Connection(const Connection& connection);
Connection& operator = (const Connection& connection);

Impl_t *getImpl() { return impl; }

//Required for storing Connections in hash tables (std::unordered_map)
friend bool operator == (const Connection<BUFFER, NetProvider>& lhs,
const Connection<BUFFER, NetProvider>& rhs)
Expand Down Expand Up @@ -325,6 +331,13 @@ Connection<BUFFER, NetProvider>::Connection(Connector<BUFFER, NetProvider> &conn
impl->ref();
}

template<class BUFFER, class NetProvider>
Connection<BUFFER, NetProvider>::Connection(ConnectionImpl<BUFFER, NetProvider> *a) :
space(*this), impl(a)
{
impl->ref();
}

template<class BUFFER, class NetProvider>
Connection<BUFFER, NetProvider>::Connection(const Connection& connection) :
space(*this), impl(connection.impl)
Expand Down
10 changes: 1 addition & 9 deletions src/Client/Connector.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,6 @@ class Connector
void finishSend(const Connection<BUFFER, NetProvider> &conn);

std::set<Connection<BUFFER, NetProvider>> m_ReadyToSend;
void close(int socket);
void close(Connection<BUFFER, NetProvider> &conn);
private:
//Timeout of Connector::connect() method.
Expand Down Expand Up @@ -109,19 +108,12 @@ Connector<BUFFER, NetProvider>::connect(Connection<BUFFER, NetProvider> &conn,
return 0;
}

template<class BUFFER, class NetProvider>
void
Connector<BUFFER, NetProvider>::close(int socket)
{
m_NetProvider.close(socket);
}

template<class BUFFER, class NetProvider>
void
Connector<BUFFER, NetProvider>::close(Connection<BUFFER, NetProvider> &conn)
{
assert(conn.getSocket() >= 0);
m_NetProvider.close(conn.getSocket());
m_NetProvider.close(conn);
conn.setSocket(-1);
}

Expand Down
94 changes: 26 additions & 68 deletions src/Client/EpollNetProvider.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,15 +56,13 @@ class EpollNetProvider {
~EpollNetProvider();
int connect(Conn_t &conn, const std::string_view& addr, unsigned port,
size_t timeout);
void close(int socket);
void close(Conn_t &conn);
/** Read and write to sockets; polling using epoll. */
int wait(int timeout);

bool check(Conn_t &conn);
private:
static constexpr size_t DEFAULT_TIMEOUT = 100;
static constexpr size_t EVENT_POLL_COUNT_MAX = 64;
static constexpr size_t EPOLL_QUEUE_LEN = 1024;
static constexpr size_t EPOLL_EVENTS_MAX = 128;

//return 0 if all data from buffer was processed (sent or read);
Expand All @@ -73,13 +71,10 @@ class EpollNetProvider {
int send(Conn_t &conn);
int recv(Conn_t &conn);

int poll(struct ConnectionEvent *fds, size_t *fd_count,
int timeout = DEFAULT_TIMEOUT);
void setPollSetting(int socket, int setting);
void registerEpoll(int socket);
void setPollSetting(Conn_t &conn, int setting);
void registerEpoll(Conn_t &conn);

/** <socket : connection> map. Contains both ready to read/send connections */
std::map<int, Conn_t > m_Connections;
Connector_t &m_Connector;
int m_EpollFd;
};
Expand All @@ -88,7 +83,7 @@ template<class BUFFER, class NETWORK>
EpollNetProvider<BUFFER, NETWORK>::EpollNetProvider(Connector_t &connector) :
m_Connector(connector)
{
m_EpollFd = epoll_create(EPOLL_QUEUE_LEN);
m_EpollFd = epoll_create1(EPOLL_CLOEXEC);
if (m_EpollFd == -1) {
LOG_ERROR("Failed to initialize epoll: ", strerror(errno));
abort();
Expand All @@ -99,22 +94,20 @@ template<class BUFFER, class NETWORK>
EpollNetProvider<BUFFER, NETWORK>::~EpollNetProvider()
{
::close(m_EpollFd);
m_EpollFd = 0;

for (auto conn = m_Connections.begin(); conn != m_Connections.end();)
conn = m_Connections.erase(conn);
m_EpollFd = -1;
}

template<class BUFFER, class NETWORK>
void
EpollNetProvider<BUFFER, NETWORK>::registerEpoll(int socket)
EpollNetProvider<BUFFER, NETWORK>::registerEpoll(Conn_t &conn)
{
/* Configure epoll with new socket. */
assert(m_EpollFd >= 0);
struct epoll_event event;
event.events = EPOLLIN;
event.data.fd = socket;
if (epoll_ctl(m_EpollFd, EPOLL_CTL_ADD, socket, &event) != 0) {
event.data.ptr = conn.getImpl();
conn.getImpl()->ref();
if (epoll_ctl(m_EpollFd, EPOLL_CTL_ADD, conn.getSocket(), &event) != 0) {
LOG_ERROR("Failed to add socket to epoll: "
"epoll_ctl() returned with errno: ",
strerror(errno));
Expand All @@ -124,11 +117,11 @@ EpollNetProvider<BUFFER, NETWORK>::registerEpoll(int socket)

template<class BUFFER, class NETWORK>
void
EpollNetProvider<BUFFER, NETWORK>::setPollSetting(int socket, int setting) {
EpollNetProvider<BUFFER, NETWORK>::setPollSetting(Conn_t &conn, int setting) {
struct epoll_event event;
event.events = setting;
event.data.fd = socket;
if (epoll_ctl(m_EpollFd, EPOLL_CTL_MOD, socket, &event) != 0) {
event.data.ptr = conn.getImpl();
if (epoll_ctl(m_EpollFd, EPOLL_CTL_MOD, conn.getSocket(), &event) != 0) {
LOG_ERROR("Failed to change epoll mode: "
"epoll_ctl() returned with errno: ",
strerror(errno));
Expand Down Expand Up @@ -173,16 +166,16 @@ EpollNetProvider<BUFFER, NETWORK>::connect(Conn_t &conn,
LOG_DEBUG("Greetings are decoded");
LOG_DEBUG("Authentication processing...");
//TODO: add authentication step.
registerEpoll(socket);
conn.setSocket(socket);
m_Connections.insert({socket, conn});
registerEpoll(conn);
return 0;
}

template<class BUFFER, class NETWORK>
void
EpollNetProvider<BUFFER, NETWORK>::close(int socket)
EpollNetProvider<BUFFER, NETWORK>::close(Conn_t& conn)
{
int socket = conn.getSocket();
assert(socket >= 0);
#ifndef NDEBUG
struct sockaddr sa;
Expand All @@ -201,46 +194,15 @@ EpollNetProvider<BUFFER, NETWORK>::close(int socket)
" corresponding to address ", addr);
}
#endif
conn.getImpl()->unref();
NETWORK::close(socket);
struct epoll_event event;
event.events = EPOLLIN;
event.data.fd = socket;
/*
* Descriptor is automatically removed from epoll handler
* when all descriptors are closed. So in case
* there's other descriptors on open socket, invoke
* epoll_ctl manually.
*/
epoll_ctl(m_EpollFd, EPOLL_CTL_DEL, socket, &event);
//close can be called during epoll provider destruction. In this case
//all connections staying alive only due to the presence in m_Connections
//map. While cleaning up m_Connections destructors of connections will be
//called. So to avoid double-free presence check in m_Connections is required.
if (m_Connections.find(socket) != m_Connections.end()) {
assert(m_Connections.find(socket)->second.getSocket() == socket);
m_Connections.erase(socket);
}
}

template<class BUFFER, class NETWORK>
int
EpollNetProvider<BUFFER, NETWORK>::poll(struct ConnectionEvent *fds,
size_t *fd_count, int timeout)
{
static struct epoll_event events[EPOLL_EVENTS_MAX];
*fd_count = 0;
int event_cnt = epoll_wait(m_EpollFd, events, EPOLL_EVENTS_MAX,
timeout);
if (event_cnt == -1)
return -1;
assert(event_cnt >= 0);
for (int i = 0; i < event_cnt; ++i) {
fds[*fd_count].sock = events[i].data.fd;
fds[*fd_count].event = events[i].events;
(*fd_count)++;
}
assert(*fd_count == (size_t) event_cnt);
return 0;
epoll_ctl(m_EpollFd, EPOLL_CTL_DEL, socket, nullptr);
}

template<class BUFFER, class NETWORK>
Expand Down Expand Up @@ -286,7 +248,7 @@ EpollNetProvider<BUFFER, NETWORK>::send(Conn_t &conn)
hasSentBytes(conn, sent_bytes);
if (rc != 0) {
if (netWouldBlock(errno)) {
setPollSetting(conn.getSocket(), EPOLLIN | EPOLLOUT);
setPollSetting(conn, EPOLLIN | EPOLLOUT);
return 1;
}
conn.setError(std::string("Failed to send request: ") +
Expand Down Expand Up @@ -316,19 +278,17 @@ EpollNetProvider<BUFFER, NETWORK>::wait(int timeout)
}

/* Firstly poll connections to point out if there's data to read. */
static struct ConnectionEvent events[EVENT_POLL_COUNT_MAX];
size_t event_cnt = 0;
if (poll((ConnectionEvent *)&events, &event_cnt, timeout) != 0) {
struct epoll_event events[EPOLL_EVENTS_MAX];
int event_cnt = epoll_wait(m_EpollFd, events, EPOLL_EVENTS_MAX, timeout);
if (event_cnt < 0) {
//Poll error doesn't belong to any connection so just global
//log it.
LOG_ERROR("Poll failed: ", strerror(errno));
return -1;
}
for (size_t i = 0; i < event_cnt; ++i) {
assert(m_Connections.find(events[i].sock) != m_Connections.end());
if ((events[i].event & EPOLLIN) != 0) {
Conn_t conn = m_Connections.find(events[i].sock)->second;
assert(conn.getSocket() == events[i].sock);
for (int i = 0; i < event_cnt; ++i) {
Conn_t conn((typename Conn_t::Impl_t *)events[i].data.ptr);
if ((events[i].events & EPOLLIN) != 0) {
LOG_DEBUG("Registered poll event ", i, ": ",
conn.getSocket(), " socket is ready to read");
/*
Expand All @@ -342,9 +302,7 @@ EpollNetProvider<BUFFER, NETWORK>::wait(int timeout)
m_Connector.readyToDecode(conn);
}

if ((events[i].event & EPOLLOUT) != 0) {
Conn_t conn = m_Connections.find(events[i].sock)->second;
assert(conn.getSocket() == events[i].sock);
if ((events[i].events & EPOLLOUT) != 0) {
LOG_DEBUG("Registered poll event ", i, ": ",
conn.getSocket(), " socket is ready to write");
int rc = send(conn);
Expand All @@ -353,7 +311,7 @@ EpollNetProvider<BUFFER, NETWORK>::wait(int timeout)
/* All data from connection has been successfully written. */
if (rc == 0) {
m_Connector.finishSend(conn);
setPollSetting(conn.getSocket(), EPOLLIN);
setPollSetting(conn, EPOLLIN);
}
}
}
Expand Down
5 changes: 3 additions & 2 deletions src/Client/LibevNetProvider.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ class LibevNetProvider {
LibevNetProvider(Connector_t &connector, struct ev_loop *loop = nullptr);
int connect(Conn_t &conn, const std::string_view& addr, unsigned port,
size_t timeout);
void close(int socket);
void close(Conn_t &conn);
int wait(int timeout);
bool check(Conn_t &conn);

Expand Down Expand Up @@ -316,8 +316,9 @@ LibevNetProvider<BUFFER, NETWORK>::connect(Conn_t &conn,

template<class BUFFER, class NETWORK>
void
LibevNetProvider<BUFFER, NETWORK>::close(int socket)
LibevNetProvider<BUFFER, NETWORK>::close(Conn_t &conn)
{
int socket = conn.getSocket();
NETWORK::close(socket);
//close can be called during libev provider destruction. In this case
//all connections staying alive only due to the presence in m_Watchers
Expand Down

0 comments on commit b6e49c8

Please sign in to comment.