Skip to content

Commit

Permalink
Client: introduce stream
Browse files Browse the repository at this point in the history
Implement Stream class instead of NetWorkEngine class.

Stream class encapsulates network connection with simple API:
connect, close, send, recv. It also has status that show blocking
state of I/O.

There must be no logical changes in this patch.

Part of #28
  • Loading branch information
alyapunov committed Aug 31, 2022
1 parent 259a059 commit b348525
Show file tree
Hide file tree
Showing 14 changed files with 1,059 additions and 510 deletions.
2 changes: 1 addition & 1 deletion examples/Simple.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ int WAIT_TIMEOUT = 1000; //milliseconds
//doclabel02-1
using Buf_t = tnt::Buffer<16 * 1024>;
#include "../src/Client/LibevNetProvider.hpp"
using Net_t = LibevNetProvider<Buf_t, NetworkEngine>;
using Net_t = LibevNetProvider<Buf_t, DefaultStream>;
//doclabel02-2

//doclabel16-1
Expand Down
31 changes: 10 additions & 21 deletions src/Client/Connection.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@

#include "RequestEncoder.hpp"
#include "ResponseDecoder.hpp"
#include "Stream.hpp"
#include "../Utils/Logger.hpp"

#include <sys/uio.h> //iovec
Expand Down Expand Up @@ -80,7 +81,8 @@ struct ConnectionImpl
ResponseDecoder<BUFFER> dec;
/* Iterator separating decoded and raw data in input buffer. */
iterator endDecoded;
int socket;
/* Network layer of the connection. */
typename NetProvider::Stream_t strm;
//Several connection wrappers may point to the same implementation.
//It is useful to store connection objects in stl containers for example.
ssize_t refs;
Expand All @@ -94,18 +96,17 @@ struct ConnectionImpl
template<class BUFFER, class NetProvider>
ConnectionImpl<BUFFER, NetProvider>::ConnectionImpl(Connector<BUFFER, NetProvider> &conn) :
connector(conn), inBuf(), outBuf(), enc(outBuf), dec(inBuf),
endDecoded(inBuf.begin()), socket(-1), refs(0)
endDecoded(inBuf.begin()), refs(0)
{
}

template<class BUFFER, class NetProvider>
ConnectionImpl<BUFFER, NetProvider>::~ConnectionImpl()
{
assert(refs == 0);
if (socket >= 0) {
if (!strm.has_status(SS_DEAD)) {
Connection<BUFFER, NetProvider> conn(this);
connector.close(conn);
socket = -1;
}
}

Expand Down Expand Up @@ -143,6 +144,9 @@ class Connection

Impl_t *getImpl() { return impl; }

typename NetProvider::Stream_t &get_strm() { return impl->strm; }
const typename NetProvider::Stream_t &get_strm() const { return impl->strm; }

//Required for storing Connections in hash tables (std::unordered_map)
friend bool operator == (const Connection<BUFFER, NetProvider>& lhs,
const Connection<BUFFER, NetProvider>& rhs)
Expand All @@ -154,7 +158,8 @@ class Connection
friend bool operator < (const Connection<BUFFER, NetProvider>& lhs,
const Connection<BUFFER, NetProvider>& rhs)
{
return lhs.impl->socket < rhs.impl->socket;
// TODO: remove dependency on socket.
return lhs.get_strm().get_fd() < rhs.get_strm().get_fd();
}

Response<BUFFER> getResponse(rid_t future);
Expand All @@ -169,8 +174,6 @@ class Connection
void setError(const std::string &msg, int errno_ = 0);
ConnectionError& getError();
void reset();
int getSocket() const;
void setSocket(int socket);
BUFFER& getInBuf();
BUFFER& getOutBuf();

Expand Down Expand Up @@ -416,20 +419,6 @@ Connection<BUFFER, NetProvider>::reset()
impl->error = ConnectionError{};
}

template<class BUFFER, class NetProvider>
int
Connection<BUFFER, NetProvider>::getSocket() const
{
return impl->socket;
}

template<class BUFFER, class NetProvider>
void
Connection<BUFFER, NetProvider>::setSocket(int socket)
{
impl->socket = socket;
}

template<class BUFFER, class NetProvider>
BUFFER&
Connection<BUFFER, NetProvider>::getInBuf()
Expand Down
24 changes: 11 additions & 13 deletions src/Client/Connector.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,22 +30,24 @@
* SUCH DAMAGE.
*/
#include "Connection.hpp"
#include "NetworkEngine.hpp"
#include "UnixPlainStream.hpp"
#include "../Utils/Timer.hpp"

#include <set>

using DefaultStream = UnixPlainStream;

/**
* MacOS does not have epoll so let's use Libev as default network provider.
*/
#ifdef __linux__
#include "EpollNetProvider.hpp"
template<class BUFFER>
using DefaultNetProvider = EpollNetProvider<BUFFER, NetworkEngine>;
using DefaultNetProvider = EpollNetProvider<BUFFER, DefaultStream>;
#else
#include "LibevNetProvider.hpp"
template<class BUFFER>
using DefaultNetProvider = LibevNetProvider<BUFFER, NetworkEngine>;
using DefaultNetProvider = LibevNetProvider<BUFFER, DefaultStream>;
#endif

template<class BUFFER, class NetProvider = DefaultNetProvider<BUFFER>>
Expand All @@ -58,8 +60,7 @@ class Connector
Connector& operator = (const Connector& connector) = delete;
//////////////////////////////Main API//////////////////////////////////
int connect(Connection<BUFFER, NetProvider> &conn,
const std::string_view& addr, unsigned port,
size_t timeout = DEFAULT_CONNECT_TIMEOUT);
const std::string& addr, unsigned port);

int wait(Connection<BUFFER, NetProvider> &conn, rid_t future,
int timeout = 0, Response<BUFFER> *result = nullptr);
Expand All @@ -76,8 +77,6 @@ class Connector
std::set<Connection<BUFFER, NetProvider>> m_ReadyToSend;
void close(Connection<BUFFER, NetProvider> &conn);
private:
//Timeout of Connector::connect() method.
constexpr static size_t DEFAULT_CONNECT_TIMEOUT = 2;
NetProvider m_NetProvider;
std::set<Connection<BUFFER, NetProvider>> m_ReadyToDecode;
};
Expand All @@ -95,12 +94,12 @@ Connector<BUFFER, NetProvider>::~Connector()
template<class BUFFER, class NetProvider>
int
Connector<BUFFER, NetProvider>::connect(Connection<BUFFER, NetProvider> &conn,
const std::string_view& addr,
unsigned port, size_t timeout)
const std::string& addr,
unsigned port)
{
//Make sure that connection is not yet established.
assert(conn.getSocket() < 0);
if (m_NetProvider.connect(conn, addr, port, timeout) != 0) {
assert(conn.get_strm().has_status(SS_DEAD));
if (m_NetProvider.connect(conn, addr, port) != 0) {
LOG_ERROR("Failed to connect to ", addr, ':', port);
return -1;
}
Expand All @@ -112,9 +111,8 @@ template<class BUFFER, class NetProvider>
void
Connector<BUFFER, NetProvider>::close(Connection<BUFFER, NetProvider> &conn)
{
assert(conn.getSocket() >= 0);
assert(!conn.get_strm().has_status(SS_DEAD));
m_NetProvider.close(conn);
conn.setSocket(-1);
}

template<class BUFFER, class NetProvider>
Expand Down
Loading

0 comments on commit b348525

Please sign in to comment.