diff --git a/docs/eventloop.zh-cn.md b/docs/eventloop.zh-cn.md index 40bc1219..b7f99013 100644 --- a/docs/eventloop.zh-cn.md +++ b/docs/eventloop.zh-cn.md @@ -12,6 +12,10 @@ 当有事件产生时,做完工作后即可返回,所需时间依负荷而定.
通常,我们会开启一个线程,在其中间断性的调用`loop`接口。 +- `EventLoop::bindCurrentThread` + + 初始化调度EventLoop的thread id,用于使用EventLoop接口时时判断当前线程是否处于EventLoop::loop所在线程。在使用EventLoop的一些函数,比如runAsyncFunctor时,都需要执行bindCurrentThread(当然,你也可以直接调用loop来初始化) + - `EventLoop::wakeup(void)` (线程安全)唤醒可能阻塞在`EventLoop::loop`中的等待。
@@ -20,7 +24,7 @@ - `EventLoop::runAsyncFunctor(std::function)` - (线程安全)投递一个异步函数给`EventLoop`,此函数会在`EventLoop::loop`调用中被执行。 + (线程安全)投递一个异步函数给`EventLoop`,此函数会在`EventLoop::loop`调用中被执行。如果此时EventLoop还没有初始化thread id(通过bindCurrentThread或loop),那么此函数会抛出异常,避免用户忘记调度IO工作线程时就投递了任务而导致逻辑错误。 - `EventLoop::runFunctorAfterLoop(std::function)` @@ -31,6 +35,15 @@ (线程安全)检测当前线程是否和 `EventLoop::loop`所在线程(也就是最先调用`loop`接口的线程)一样。 +- `brynet::base::Timer::WeakPtr EventLoop::runAfter(std::chrono::nanoseconds timeout, std::function&& callback) + + 开启一个延迟执行的函数,当到期时会在EventLoop工作线程(即loop函数所在线程)里执行callback。可通过返回的brynet::base::Timer::WeakPtr的cancel函数来取消定时器。 + +- `RepeatTimer::Ptr EventLoop::brynet::base::RepeatTimer::Ptr runIntervalTimer(std::chrono::nanoseconds timeout, std::function&& callback)` + + 开启一个重复执行的延迟执行的函数,当每次到期时会在EventLoop工作线程(即loop函数所在线程)里执行callback。可通过返回的RepeatTimer::Ptr的cancel函数来取消定时器。 + + # 注意事项 - 当我们第一次在某个线程中调用`loop`之后,就不应该在其他线程中调用`loop`(当然如果你调用了,也没有任何效果/影响) - 如果没有任何线程调用`loop`,那么使用`pushAsyncProc`投递的异步函数将不会被执行,直到有线程调用了`loop`接口,这些异步函数才会被执行。 diff --git a/docs/tcp_service.zh-cn.md b/docs/tcp_service.zh-cn.md index d3b6d566..54fc6996 100644 --- a/docs/tcp_service.zh-cn.md +++ b/docs/tcp_service.zh-cn.md @@ -1,28 +1,45 @@ # 概述 -`TCPService`是`brynet`中对`TcpConnection`和`EventLoop`的封装组合。 +`ITCPService`是`brynet`中对`TcpConnection`和`EventLoop`的封装抽象类型。它的派生类分为两个:`IOThreadTcpService`和`EventLoopTcpService`。 + + +`IOThreadTcpService`开启额外的IO工作线程,并使用其独立的EventLoop处理网络。 + +`EventLoopTcpService`则不会开启额外的线程,而是直接使用用户设定的EventLoop来处理网络,其线程就是调用`loop`函数来调度此EventLoop的线程, +它通常可以用于不需要极致性能的场景,比如直接在主线程处理IO。 + 源代码见:[TCPService.h](https://github.com/IronsDu/brynet/blob/master/include/brynet/net/TCPService.hpp).
# 接口 -- `TcpService::Create(void)` +- `ITcpService::addTcpConnection(TcpSocket::Ptr socket, Options...)` + + 这是ITcpService抽象类的接口,其用途是将一个TcpConnection交给具体的ITcpService管理,其中Options请查阅`AddSocketOption的WithXXX系列函数`。 + +- `IOThreadTcpService::Create(void)` - 此静态函数用于创建网络服务对象,且只能使用此接口创建`TcpService`对象。
+ 此静态函数用于创建网络服务对象,且只能使用此接口创建`ITcpService`对象。
用户通过服务对象操作网络会话。 -- `TcpService::startWorkerThread(size_t threadNum, FRAME_CALLBACK callback = nullptr)` +- `IOThreadTcpService::startWorkerThread(size_t threadNum, FRAME_CALLBACK callback = nullptr)` + + (线程安全)开启IO工作线程,每一个工作线程有一个`EventLoop`负责事件检测。 + + +- `IOThreadTcpService::stopWorkerThread()` - (线程安全)开启工作线程,每一个工作线程有一个`EventLoop`负责事件检测。 + (线程安全)关闭IO工作线程。 + +- `EventLoopTcpService::Create(EventLoop::Ptr eventLoop)` -- `TcpService::addTcpConnection(TcpSocket::Ptr socket, Options...)` + 根据用户指定的EventLoop构造一个单线程TcpService管理器。 - (线程安全),将一个TcpConnection交给TcpService管理,其中Options请查阅`AddSocketOption的WithXXX系列函数`。 ## 示例 ```C++ -auto service = TcpService::Create(); +auto service = IOThreadTcpService::Create(); // use blocking connect for test auto fd = ox_socket_connect(false, ip, port); auto socket = TcpSocket::Create(fd, false); diff --git a/examples/BenchWebsocket.cpp b/examples/BenchWebsocket.cpp index 6def6daf..b0b397c2 100644 --- a/examples/BenchWebsocket.cpp +++ b/examples/BenchWebsocket.cpp @@ -81,7 +81,7 @@ int main(int argc, char** argv) }); }; - auto service = TcpService::Create(); + auto service = IOThreadTcpService::Create(); service->startWorkerThread(workers); auto connector = AsyncConnector::Create(); diff --git a/examples/BroadCastServer.cpp b/examples/BroadCastServer.cpp index e1251f88..603fb131 100644 --- a/examples/BroadCastServer.cpp +++ b/examples/BroadCastServer.cpp @@ -23,7 +23,7 @@ std::atomic_llong SendPacketNum = ATOMIC_VAR_INIT(0); std::atomic_llong RecvPacketNum = ATOMIC_VAR_INIT(0); std::vector clients; -TcpService::Ptr service; +IOThreadTcpService::Ptr service; static void addClientID(const TcpConnection::Ptr& session) { @@ -74,7 +74,7 @@ int main(int argc, char** argv) int threadNum = atoi(argv[2]); brynet::net::base::InitSocket(); - service = TcpService::Create(); + service = IOThreadTcpService::Create(); auto mainLoop = std::make_shared(); auto listenThread = ListenThread::Create(false, "0.0.0.0", port, [mainLoop](TcpSocket::Ptr socket) { socket->setNodelay(); diff --git a/examples/CMakeLists.txt b/examples/CMakeLists.txt index ba8f418a..bbb5f81e 100644 --- a/examples/CMakeLists.txt +++ b/examples/CMakeLists.txt @@ -28,6 +28,14 @@ elseif(UNIX) target_link_libraries(pingpongserver pthread) endif() +add_executable(singlethread-pingpongserver SingleThreadPingPongServer.cpp ${HEADER_LIST}) +if(WIN32) + target_link_libraries(singlethread-pingpongserver ws2_32) +elseif(UNIX) + find_package(Threads REQUIRED) + target_link_libraries(singlethread-pingpongserver pthread) +endif() + add_executable(pingpongclient PingPongClient.cpp) if(WIN32) target_link_libraries(pingpongclient ws2_32) diff --git a/examples/HttpClient.cpp b/examples/HttpClient.cpp index c299c38f..ea501da6 100644 --- a/examples/HttpClient.cpp +++ b/examples/HttpClient.cpp @@ -19,7 +19,7 @@ int main(int argc, char** argv) (void) argc; (void) argv; - auto service = TcpService::Create(); + auto service = IOThreadTcpService::Create(); service->startWorkerThread(2); auto connector = brynet::net::AsyncConnector::Create(); diff --git a/examples/HttpServer.cpp b/examples/HttpServer.cpp index b43a56ce..7dcfb3d2 100644 --- a/examples/HttpServer.cpp +++ b/examples/HttpServer.cpp @@ -21,7 +21,7 @@ int main(int argc, char** argv) } const auto port = std::atoi(argv[1]); - auto service = TcpService::Create(); + auto service = IOThreadTcpService::Create(); service->startWorkerThread(atoi(argv[2])); auto httpEnterCallback = [](const HTTPParser& httpParser, diff --git a/examples/PingPongClient.cpp b/examples/PingPongClient.cpp index f9a4d1ba..a95bda86 100644 --- a/examples/PingPongClient.cpp +++ b/examples/PingPongClient.cpp @@ -18,7 +18,7 @@ int main(int argc, char** argv) std::string tmp(atoi(argv[5]), 'a'); - auto service = TcpService::Create(); + auto service = IOThreadTcpService::Create(); service->startWorkerThread(atoi(argv[3])); auto connector = AsyncConnector::Create(); diff --git a/examples/PingPongServer.cpp b/examples/PingPongServer.cpp index 3dc752a3..b960932c 100644 --- a/examples/PingPongServer.cpp +++ b/examples/PingPongServer.cpp @@ -21,7 +21,7 @@ int main(int argc, char** argv) exit(-1); } - auto service = TcpService::Create(); + auto service = IOThreadTcpService::Create(); service->startWorkerThread(atoi(argv[2])); auto enterCallback = [](const TcpConnection::Ptr& session) { diff --git a/examples/PromiseReceive.cpp b/examples/PromiseReceive.cpp index efc5e9e2..da4b1973 100644 --- a/examples/PromiseReceive.cpp +++ b/examples/PromiseReceive.cpp @@ -19,7 +19,7 @@ int main(int argc, char** argv) exit(-1); } - auto service = TcpService::Create(); + auto service = IOThreadTcpService::Create(); service->startWorkerThread(atoi(argv[2])); auto enterCallback = [](const TcpConnection::Ptr& session) { diff --git a/examples/SingleThreadPingPongServer.cpp b/examples/SingleThreadPingPongServer.cpp new file mode 100644 index 00000000..b4c7ca2b --- /dev/null +++ b/examples/SingleThreadPingPongServer.cpp @@ -0,0 +1,85 @@ +#include +#include +#include +#include +#include +#include +#include + +using namespace brynet; +using namespace brynet::net; + +std::atomic_llong TotalRecvSize = ATOMIC_VAR_INIT(0); +std::atomic_llong total_client_num = ATOMIC_VAR_INIT(0); +std::atomic_llong total_packet_num = ATOMIC_VAR_INIT(0); + +int main(int argc, char** argv) +{ + if (argc != 3) + { + fprintf(stderr, "Usage: \n"); + exit(-1); + } + + auto enterCallback = [](const TcpConnection::Ptr& session) { + total_client_num++; + + session->setDataCallback([session](brynet::base::BasePacketReader& reader) { + session->send(reader.begin(), reader.size()); + TotalRecvSize += reader.size(); + total_packet_num++; + reader.consumeAll(); + }); + + session->setDisConnectCallback([](const TcpConnection::Ptr& session) { + (void) session; + total_client_num--; + }); + }; + + EventLoop::Ptr mainLoop = std::make_shared(); + mainLoop->bindCurrentThread(); + + auto service = EventLoopTcpService::Create(mainLoop); + + wrapper::ListenerBuilder listener; + listener.WithService(service) + .AddSocketProcess({[](TcpSocket& socket) { + socket.setNodelay(); + }}) + .WithMaxRecvBufferSize(1024) + .AddEnterCallback(enterCallback) + .WithAddr(false, "0.0.0.0", atoi(argv[1])) + .asyncRun(); + + mainLoop->runIntervalTimer(std::chrono::seconds(1), [&]() { + if (TotalRecvSize / 1024 == 0) + { + std::cout << "total recv : " << TotalRecvSize << " bytes/s, of client num:" << total_client_num << std::endl; + } + else if ((TotalRecvSize / 1024) / 1024 == 0) + { + std::cout << "total recv : " << TotalRecvSize / 1024 << " K/s, of client num:" << total_client_num << std::endl; + } + else + { + std::cout << "total recv : " << (TotalRecvSize / 1024) / 1024 << " M/s, of client num:" << total_client_num << std::endl; + } + + std::cout << "packet num:" << total_packet_num << std::endl; + total_packet_num = 0; + TotalRecvSize = 0; + }); + + while (true) + { + mainLoop->loop(1000); + + if (brynet::base::app_kbhit()) + { + break; + } + } + + return 0; +} diff --git a/examples/WebBinaryProxy.cpp b/examples/WebBinaryProxy.cpp index 71b6c756..1f1638cb 100644 --- a/examples/WebBinaryProxy.cpp +++ b/examples/WebBinaryProxy.cpp @@ -23,7 +23,7 @@ int main(int argc, char** argv) string backendIP = argv[2]; int backendPort = atoi(argv[3]); - auto tcpService = TcpService::Create(); + auto tcpService = IOThreadTcpService::Create(); tcpService->startWorkerThread(std::thread::hardware_concurrency()); auto asyncConnector = AsyncConnector::Create(); diff --git a/include/brynet/base/Timer.hpp b/include/brynet/base/Timer.hpp index 55e917da..a11ab612 100644 --- a/include/brynet/base/Timer.hpp +++ b/include/brynet/base/Timer.hpp @@ -186,6 +186,20 @@ class TimerMgr final : public std::enable_shared_from_this } } + template + void helperAddIntervalTimer( + RepeatTimer::Ptr repeatTimer, + std::chrono::nanoseconds interval, + F&& callback, + TArgs&&... args) + { + auto sharedThis = shared_from_this(); + auto wrapperCallback = std::bind(std::forward(callback), std::forward(args)...); + addTimer(interval, [sharedThis, interval, wrapperCallback, repeatTimer]() { + stubRepeatTimerCallback(sharedThis, interval, wrapperCallback, repeatTimer); + }); + } + private: static void stubRepeatTimerCallback(TimerMgr::Ptr timerMgr, std::chrono::nanoseconds interval, diff --git a/include/brynet/net/EventLoop.hpp b/include/brynet/net/EventLoop.hpp index e39c4684..3b66b629 100644 --- a/include/brynet/net/EventLoop.hpp +++ b/include/brynet/net/EventLoop.hpp @@ -74,6 +74,7 @@ class EventLoop : public brynet::base::NonCopyable reAllocEventSize(1024); mSelfThreadID = -1; mTimer = std::make_shared(); + mSelfThreadIDIsInitialized.store(false); } virtual ~EventLoop() BRYNET_NOEXCEPT @@ -90,6 +91,11 @@ class EventLoop : public brynet::base::NonCopyable #endif } + void bindCurrentThread() + { + tryInitThreadID(); + } + void loop(int64_t milliseconds) { tryInitThreadID(); @@ -264,6 +270,10 @@ class EventLoop : public brynet::base::NonCopyable void runAsyncFunctor(UserFunctor&& f) { + if (!mSelfThreadIDIsInitialized.load()) + { + throw std::runtime_error("thread id not initialized, you need call `bindCurrentThread` first"); + } if (isInLoopThread()) { f(); @@ -274,6 +284,7 @@ class EventLoop : public brynet::base::NonCopyable wakeup(); } } + void runFunctorAfterLoop(UserFunctor&& f) { assert(isInLoopThread()); @@ -284,6 +295,7 @@ class EventLoop : public brynet::base::NonCopyable mAfterLoopFunctors.emplace_back(std::move(f)); } + brynet::base::Timer::WeakPtr runAfter(std::chrono::nanoseconds timeout, UserFunctor&& callback) { auto timer = std::make_shared( @@ -306,6 +318,23 @@ class EventLoop : public brynet::base::NonCopyable return timer; } + brynet::base::RepeatTimer::Ptr runIntervalTimer(std::chrono::nanoseconds timeout, UserFunctor&& callback) + { + if (isInLoopThread()) + { + return mTimer->addIntervalTimer(timeout, callback); + } + else + { + auto timer = std::make_shared(); + auto timerMgr = mTimer; + runAsyncFunctor([timerMgr, timer, timeout, callback]() { + timerMgr->helperAddIntervalTimer(timer, timeout, callback); + }); + return timer; + } + } + inline bool isInLoopThread() const { return mSelfThreadID == current_thread::tid(); @@ -403,6 +432,7 @@ class EventLoop : public brynet::base::NonCopyable { std::call_once(mOnceInitThreadID, [this]() { mSelfThreadID = current_thread::tid(); + mSelfThreadIDIsInitialized.store(true); }); } @@ -433,6 +463,7 @@ class EventLoop : public brynet::base::NonCopyable std::vector mCopyAfterLoopFunctors; std::once_flag mOnceInitThreadID; + std::atomic_bool mSelfThreadIDIsInitialized; current_thread::THREAD_ID_TYPE mSelfThreadID; brynet::base::TimerMgr::Ptr mTimer; diff --git a/include/brynet/net/TcpService.hpp b/include/brynet/net/TcpService.hpp index fbe011f7..0d42822b 100644 --- a/include/brynet/net/TcpService.hpp +++ b/include/brynet/net/TcpService.hpp @@ -5,17 +5,29 @@ namespace brynet { namespace net { using ConnectionOption = detail::ConnectionOption; -class TcpService : public detail::TcpServiceDetail, - public std::enable_shared_from_this + +class ITcpService { public: - using Ptr = std::shared_ptr; + using Ptr = std::shared_ptr; + +public: + virtual bool addTcpConnection(TcpSocket::Ptr socket, ConnectionOption options) = 0; +}; + +// use multi IO threads process IO +class IOThreadTcpService : public ITcpService, + public detail::TcpServiceDetail, + public std::enable_shared_from_this +{ +public: + using Ptr = std::shared_ptr; using FrameCallback = detail::TcpServiceDetail::FrameCallback; public: static Ptr Create() { - struct make_shared_enabler : public TcpService + struct make_shared_enabler : public IOThreadTcpService { }; return std::make_shared(); @@ -32,18 +44,37 @@ class TcpService : public detail::TcpServiceDetail, detail::TcpServiceDetail::stopWorkerThread(); } - bool addTcpConnection(TcpSocket::Ptr socket, ConnectionOption options) + bool addTcpConnection(TcpSocket::Ptr socket, ConnectionOption options) override { return detail::TcpServiceDetail::addTcpConnection(std::move(socket), options); } - EventLoop::Ptr getRandomEventLoop() +private: + IOThreadTcpService() = default; +}; + +// use specified eventloop for process IO +class EventLoopTcpService : public ITcpService +{ +public: + using Ptr = std::shared_ptr; + + EventLoopTcpService(EventLoop::Ptr eventLoop) + : mEventLoop(eventLoop) + {} + + static Ptr Create(EventLoop::Ptr eventLoop) + { + return std::make_shared(eventLoop); + } + + bool addTcpConnection(TcpSocket::Ptr socket, ConnectionOption options) override { - return detail::TcpServiceDetail::getRandomEventLoop(); + return detail::HelperAddTcpConnection(mEventLoop, std::move(socket), options); } private: - TcpService() = default; + EventLoop::Ptr mEventLoop; }; }}// namespace brynet::net diff --git a/include/brynet/net/detail/ConnectorDetail.hpp b/include/brynet/net/detail/ConnectorDetail.hpp index 0a48d062..41abe65d 100644 --- a/include/brynet/net/detail/ConnectorDetail.hpp +++ b/include/brynet/net/detail/ConnectorDetail.hpp @@ -2,6 +2,7 @@ #include #include +#include #include #include #include @@ -41,7 +42,12 @@ class AsyncConnectorDetail : public brynet::base::NonCopyable auto workerInfo = mWorkInfo; auto isRun = mIsRun; - mThread = std::make_shared([eventLoop, workerInfo, isRun]() { + auto wg = brynet::base::WaitGroup::Create(); + wg->add(1); + mThread = std::make_shared([wg, eventLoop, workerInfo, isRun]() { + eventLoop->bindCurrentThread(); + wg->done(); + while (*isRun) { detail::RunOnceCheckConnect(eventLoop, workerInfo); @@ -49,6 +55,8 @@ class AsyncConnectorDetail : public brynet::base::NonCopyable workerInfo->causeAllFailed(); }); + + wg->wait(); } void stopWorkerThread() diff --git a/include/brynet/net/detail/TCPServiceDetail.hpp b/include/brynet/net/detail/TCPServiceDetail.hpp index 7d251fef..182a4a6f 100644 --- a/include/brynet/net/detail/TCPServiceDetail.hpp +++ b/include/brynet/net/detail/TCPServiceDetail.hpp @@ -2,6 +2,7 @@ #include #include +#include #include #include #include @@ -15,12 +16,56 @@ namespace brynet { namespace net { namespace detail { +static bool HelperAddTcpConnection(EventLoop::Ptr eventLoop, TcpSocket::Ptr socket, ConnectionOption option) +{ + if (eventLoop == nullptr) + { + return false; + } + if (option.maxRecvBufferSize <= 0) + { + throw BrynetCommonException("buffer size is zero"); + } + + auto wrapperEnterCallback = [option](const TcpConnection::Ptr& tcpConnection) { + for (const auto& callback : option.enterCallback) + { + callback(tcpConnection); + } + }; + + if (option.useSSL && option.sslHelper == nullptr) + { + option.sslHelper = SSLHelper::Create(); + } + + TcpConnection::Create(std::move(socket), + option.maxRecvBufferSize, + wrapperEnterCallback, + eventLoop, + option.sslHelper); + + return true; +} + class TcpServiceDetail : public brynet::base::NonCopyable { protected: using FrameCallback = std::function; const static unsigned int sDefaultLoopTimeOutMS = 100; + TcpServiceDetail() BRYNET_NOEXCEPT + : mRandom(static_cast( + std::chrono::system_clock::now().time_since_epoch().count())) + { + mRunIOLoop = std::make_shared(false); + } + + virtual ~TcpServiceDetail() BRYNET_NOEXCEPT + { + stopWorkerThread(); + } + void startWorkerThread(size_t threadNum, FrameCallback callback = nullptr) { @@ -35,13 +80,18 @@ class TcpServiceDetail : public brynet::base::NonCopyable mRunIOLoop = std::make_shared(true); mIOLoopDatas.resize(threadNum); + auto wg = brynet::base::WaitGroup::Create(); for (auto& v : mIOLoopDatas) { auto eventLoop = std::make_shared(); auto runIoLoop = mRunIOLoop; + wg->add(1); v = IOLoopData::Create(eventLoop, std::make_shared( - [callback, runIoLoop, eventLoop]() { + [wg, callback, runIoLoop, eventLoop]() { + eventLoop->bindCurrentThread(); + wg->done(); + while (*runIoLoop) { eventLoop->loopCompareNearTimer(sDefaultLoopTimeOutMS); @@ -52,6 +102,7 @@ class TcpServiceDetail : public brynet::base::NonCopyable } })); } + wg->wait(); } void stopWorkerThread() @@ -81,11 +132,6 @@ class TcpServiceDetail : public brynet::base::NonCopyable bool addTcpConnection(TcpSocket::Ptr socket, ConnectionOption option) { - if (option.maxRecvBufferSize <= 0) - { - throw BrynetCommonException("buffer size is zero"); - } - EventLoop::Ptr eventLoop; if (option.forceSameThreadLoop) { @@ -95,30 +141,7 @@ class TcpServiceDetail : public brynet::base::NonCopyable { eventLoop = getRandomEventLoop(); } - if (eventLoop == nullptr) - { - return false; - } - - auto wrapperEnterCallback = [option](const TcpConnection::Ptr& tcpConnection) { - for (const auto& callback : option.enterCallback) - { - callback(tcpConnection); - } - }; - - if (option.useSSL && option.sslHelper == nullptr) - { - option.sslHelper = SSLHelper::Create(); - } - - TcpConnection::Create(std::move(socket), - option.maxRecvBufferSize, - wrapperEnterCallback, - eventLoop, - option.sslHelper); - - return true; + return HelperAddTcpConnection(eventLoop, std::move(socket), option); } EventLoop::Ptr getRandomEventLoop() @@ -140,18 +163,6 @@ class TcpServiceDetail : public brynet::base::NonCopyable } } - TcpServiceDetail() BRYNET_NOEXCEPT - : mRandom(static_cast( - std::chrono::system_clock::now().time_since_epoch().count())) - { - mRunIOLoop = std::make_shared(false); - } - - virtual ~TcpServiceDetail() BRYNET_NOEXCEPT - { - stopWorkerThread(); - } - EventLoop::Ptr getSameThreadEventLoop() { std::lock_guard lock(mIOLoopGuard); diff --git a/include/brynet/net/wrapper/ConnectionBuilder.hpp b/include/brynet/net/wrapper/ConnectionBuilder.hpp index 4989fbd8..104fd318 100644 --- a/include/brynet/net/wrapper/ConnectionBuilder.hpp +++ b/include/brynet/net/wrapper/ConnectionBuilder.hpp @@ -108,7 +108,7 @@ template class BaseConnectionBuilder { public: - Derived& WithService(TcpService::Ptr service) + Derived& WithService(ITcpService::Ptr service) { mTcpService = std::move(service); return static_cast(*this); @@ -198,7 +198,7 @@ class BaseConnectionBuilder } private: - TcpService::Ptr mTcpService; + ITcpService::Ptr mTcpService; ConnectionOption mOption; SocketConnectBuilder mConnectBuilder; }; diff --git a/include/brynet/net/wrapper/HttpConnectionBuilder.hpp b/include/brynet/net/wrapper/HttpConnectionBuilder.hpp index 2b48e213..f297b6ef 100644 --- a/include/brynet/net/wrapper/HttpConnectionBuilder.hpp +++ b/include/brynet/net/wrapper/HttpConnectionBuilder.hpp @@ -9,7 +9,7 @@ namespace brynet { namespace net { namespace wrapper { class HttpConnectionBuilder { public: - HttpConnectionBuilder& WithService(TcpService::Ptr service) + HttpConnectionBuilder& WithService(ITcpService::Ptr service) { mBuilder.WithService(std::move(service)); return *this; diff --git a/include/brynet/net/wrapper/HttpServiceBuilder.hpp b/include/brynet/net/wrapper/HttpServiceBuilder.hpp index 1c6abd1c..70e6c945 100644 --- a/include/brynet/net/wrapper/HttpServiceBuilder.hpp +++ b/include/brynet/net/wrapper/HttpServiceBuilder.hpp @@ -9,7 +9,7 @@ namespace brynet { namespace net { namespace wrapper { class HttpListenerBuilder { public: - HttpListenerBuilder& WithService(TcpService::Ptr service) + HttpListenerBuilder& WithService(ITcpService::Ptr service) { mBuilder.WithService(std::move(service)); return *this; diff --git a/include/brynet/net/wrapper/ServiceBuilder.hpp b/include/brynet/net/wrapper/ServiceBuilder.hpp index 0694b807..4aa1148e 100644 --- a/include/brynet/net/wrapper/ServiceBuilder.hpp +++ b/include/brynet/net/wrapper/ServiceBuilder.hpp @@ -14,7 +14,7 @@ class BaseListenerBuilder public: virtual ~BaseListenerBuilder() = default; - Derived& WithService(TcpService::Ptr service) + Derived& WithService(ITcpService::Ptr service) { mTcpService = std::move(service); return static_cast(*this); @@ -99,7 +99,7 @@ class BaseListenerBuilder } private: - TcpService::Ptr mTcpService; + ITcpService::Ptr mTcpService; std::vector mSocketProcessCallbacks; ConnectionOption mSocketOption; std::string mListenAddr; diff --git a/tests/test_http.cpp b/tests/test_http.cpp index 911fc95a..be6c3cdf 100644 --- a/tests/test_http.cpp +++ b/tests/test_http.cpp @@ -30,7 +30,7 @@ TEST_CASE("http server are computed", "[http_server]") auto connector = AsyncConnector::Create(); connector->startWorkerThread(); - auto service = TcpService::Create(); + auto service = IOThreadTcpService::Create(); service->startWorkerThread(1); auto httpEnterCallback = [](const HTTPParser& httpParser, diff --git a/tests/test_sync_connect.cpp b/tests/test_sync_connect.cpp index 5d2dc28c..3597c880 100644 --- a/tests/test_sync_connect.cpp +++ b/tests/test_sync_connect.cpp @@ -15,7 +15,7 @@ TEST_CASE("SyncConnector are computed", "[sync_connect]") { {}} - // 监听服务未开启 + // listen service not open { {auto connector = AsyncConnector::Create(); connector->startWorkerThread(); @@ -32,7 +32,7 @@ TEST_CASE("SyncConnector are computed", "[sync_connect]") { auto connector = AsyncConnector::Create(); connector->startWorkerThread(); - auto service = TcpService::Create(); + auto service = IOThreadTcpService::Create(); service->startWorkerThread(1); wrapper::ConnectionBuilder connectionBuilder; @@ -47,9 +47,9 @@ TEST_CASE("SyncConnector are computed", "[sync_connect]") } } -// 使用ListenerBuilder开启监听 +// use ListenerBuilder for open listen { - auto service = TcpService::Create(); + auto service = IOThreadTcpService::Create(); service->startWorkerThread(1); wrapper::ListenerBuilder listenerBuilder; listenerBuilder.WithService(service) @@ -84,7 +84,7 @@ TEST_CASE("SyncConnector are computed", "[sync_connect]") REQUIRE(socket == nullptr); } -// 开启监听服务 +// open listen thread { auto listenThread = brynet::net::ListenThread::Create(false, ip, @@ -106,11 +106,11 @@ TEST_CASE("SyncConnector are computed", "[sync_connect]") REQUIRE(socket != nullptr); } - // Tcp Service 开启工作线程 + // Tcp Service start io work thread { auto connector = AsyncConnector::Create(); connector->startWorkerThread(); - auto service = TcpService::Create(); + auto service = IOThreadTcpService::Create(); service->startWorkerThread(1); wrapper::ConnectionBuilder connectionBuilder; @@ -124,11 +124,11 @@ TEST_CASE("SyncConnector are computed", "[sync_connect]") REQUIRE(session != nullptr); } - // Tcp Service 没开启工作线程 + // Tcp Service not open io work thread { auto connector = AsyncConnector::Create(); connector->startWorkerThread(); - auto service = TcpService::Create(); + auto service = IOThreadTcpService::Create(); wrapper::ConnectionBuilder connectionBuilder; auto session = connectionBuilder @@ -140,9 +140,28 @@ TEST_CASE("SyncConnector are computed", "[sync_connect]") REQUIRE(session == nullptr); } + + // use EventLoop for Tcp Service + { + auto connector = AsyncConnector::Create(); + connector->startWorkerThread(); + auto eventLoop = std::make_shared(); + eventLoop->bindCurrentThread(); + auto service = EventLoopTcpService::Create(eventLoop); + + wrapper::ConnectionBuilder connectionBuilder; + auto session = connectionBuilder + .WithService(service) + .WithConnector(connector) + .WithTimeout(std::chrono::seconds(2)) + .WithAddr(ip, port) + .syncConnect(); + + REQUIRE(session != nullptr); + } } -// 上个语句块的监听线程结束 +// listen thread exit { auto connector = AsyncConnector::Create(); connector->startWorkerThread();