Skip to content

Commit

Permalink
support specified event loop thread to processing IO.
Browse files Browse the repository at this point in the history
  • Loading branch information
IronsDu committed Mar 13, 2022
1 parent 7bcf604 commit 5c07012
Show file tree
Hide file tree
Showing 23 changed files with 323 additions and 86 deletions.
15 changes: 14 additions & 1 deletion docs/eventloop.zh-cn.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@
当有事件产生时,做完工作后即可返回,所需时间依负荷而定.</br>
通常,我们会开启一个线程,在其中间断性的调用`loop`接口。

- `EventLoop::bindCurrentThread`

初始化调度EventLoop的thread id,用于使用EventLoop接口时时判断当前线程是否处于EventLoop::loop所在线程。在使用EventLoop的一些函数,比如runAsyncFunctor时,都需要执行bindCurrentThread(当然,你也可以直接调用loop来初始化)

- `EventLoop::wakeup(void)`

(线程安全)唤醒可能阻塞在`EventLoop::loop`中的等待。</br>
Expand All @@ -20,7 +24,7 @@

- `EventLoop::runAsyncFunctor(std::function<void(void)>)`

(线程安全)投递一个异步函数给`EventLoop`,此函数会在`EventLoop::loop`调用中被执行。
(线程安全)投递一个异步函数给`EventLoop`,此函数会在`EventLoop::loop`调用中被执行。如果此时EventLoop还没有初始化thread id(通过bindCurrentThread或loop),那么此函数会抛出异常,避免用户忘记调度IO工作线程时就投递了任务而导致逻辑错误。

- `EventLoop::runFunctorAfterLoop(std::function<void(void)>)`

Expand All @@ -31,6 +35,15 @@

(线程安全)检测当前线程是否和 `EventLoop::loop`所在线程(也就是最先调用`loop`接口的线程)一样。

- `brynet::base::Timer::WeakPtr EventLoop::runAfter(std::chrono::nanoseconds timeout, std::function<void(void)>&& callback)

开启一个延迟执行的函数,当到期时会在EventLoop工作线程(即loop函数所在线程)里执行callback。可通过返回的brynet::base::Timer::WeakPtr的cancel函数来取消定时器。

- `RepeatTimer::Ptr EventLoop::brynet::base::RepeatTimer::Ptr runIntervalTimer(std::chrono::nanoseconds timeout, std::function<void(void)>&& callback)`

开启一个重复执行的延迟执行的函数,当每次到期时会在EventLoop工作线程(即loop函数所在线程)里执行callback。可通过返回的RepeatTimer::Ptr的cancel函数来取消定时器。


# 注意事项
- 当我们第一次在某个线程中调用`loop`之后,就不应该在其他线程中调用`loop`(当然如果你调用了,也没有任何效果/影响)
- 如果没有任何线程调用`loop`,那么使用`pushAsyncProc`投递的异步函数将不会被执行,直到有线程调用了`loop`接口,这些异步函数才会被执行。
Expand Down
33 changes: 25 additions & 8 deletions docs/tcp_service.zh-cn.md
Original file line number Diff line number Diff line change
@@ -1,28 +1,45 @@
# 概述
`TCPService``brynet`中对`TcpConnection``EventLoop`的封装组合。
`ITCPService``brynet`中对`TcpConnection``EventLoop`的封装抽象类型。它的派生类分为两个:`IOThreadTcpService``EventLoopTcpService`


`IOThreadTcpService`开启额外的IO工作线程,并使用其独立的EventLoop处理网络。

`EventLoopTcpService`则不会开启额外的线程,而是直接使用用户设定的EventLoop来处理网络,其线程就是调用`loop`函数来调度此EventLoop的线程,
它通常可以用于不需要极致性能的场景,比如直接在主线程处理IO。

源代码见:[TCPService.h](https://github.com/IronsDu/brynet/blob/master/include/brynet/net/TCPService.hpp).</br>

# 接口

- `TcpService::Create(void)`
- `ITcpService::addTcpConnection(TcpSocket::Ptr socket, Options...)`

这是ITcpService抽象类的接口,其用途是将一个TcpConnection交给具体的ITcpService管理,其中Options请查阅`AddSocketOption的WithXXX系列函数`

- `IOThreadTcpService::Create(void)`


此静态函数用于创建网络服务对象,且只能使用此接口创建`TcpService`对象。</br>
此静态函数用于创建网络服务对象,且只能使用此接口创建`ITcpService`对象。</br>
用户通过服务对象操作网络会话。

- `TcpService::startWorkerThread(size_t threadNum, FRAME_CALLBACK callback = nullptr)`
- `IOThreadTcpService::startWorkerThread(size_t threadNum, FRAME_CALLBACK callback = nullptr)`

(线程安全)开启IO工作线程,每一个工作线程有一个`EventLoop`负责事件检测。


- `IOThreadTcpService::stopWorkerThread()`

(线程安全)开启工作线程,每一个工作线程有一个`EventLoop`负责事件检测
(线程安全)关闭IO工作线程


- `EventLoopTcpService::Create(EventLoop::Ptr eventLoop)`

- `TcpService::addTcpConnection(TcpSocket::Ptr socket, Options...)`
根据用户指定的EventLoop构造一个单线程TcpService管理器。

(线程安全),将一个TcpConnection交给TcpService管理,其中Options请查阅`AddSocketOption的WithXXX系列函数`


## 示例
```C++
auto service = TcpService::Create();
auto service = IOThreadTcpService::Create();
// use blocking connect for test
auto fd = ox_socket_connect(false, ip, port);
auto socket = TcpSocket::Create(fd, false);
Expand Down
2 changes: 1 addition & 1 deletion examples/BenchWebsocket.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ int main(int argc, char** argv)
});
};

auto service = TcpService::Create();
auto service = IOThreadTcpService::Create();
service->startWorkerThread(workers);

auto connector = AsyncConnector::Create();
Expand Down
4 changes: 2 additions & 2 deletions examples/BroadCastServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ std::atomic_llong SendPacketNum = ATOMIC_VAR_INIT(0);
std::atomic_llong RecvPacketNum = ATOMIC_VAR_INIT(0);

std::vector<TcpConnection::Ptr> clients;
TcpService::Ptr service;
IOThreadTcpService::Ptr service;

static void addClientID(const TcpConnection::Ptr& session)
{
Expand Down Expand Up @@ -74,7 +74,7 @@ int main(int argc, char** argv)
int threadNum = atoi(argv[2]);
brynet::net::base::InitSocket();

service = TcpService::Create();
service = IOThreadTcpService::Create();
auto mainLoop = std::make_shared<EventLoop>();
auto listenThread = ListenThread::Create(false, "0.0.0.0", port, [mainLoop](TcpSocket::Ptr socket) {
socket->setNodelay();
Expand Down
8 changes: 8 additions & 0 deletions examples/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,14 @@ elseif(UNIX)
target_link_libraries(pingpongserver pthread)
endif()

add_executable(singlethread-pingpongserver SingleThreadPingPongServer.cpp ${HEADER_LIST})
if(WIN32)
target_link_libraries(singlethread-pingpongserver ws2_32)
elseif(UNIX)
find_package(Threads REQUIRED)
target_link_libraries(singlethread-pingpongserver pthread)
endif()

add_executable(pingpongclient PingPongClient.cpp)
if(WIN32)
target_link_libraries(pingpongclient ws2_32)
Expand Down
2 changes: 1 addition & 1 deletion examples/HttpClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ int main(int argc, char** argv)
(void) argc;
(void) argv;

auto service = TcpService::Create();
auto service = IOThreadTcpService::Create();
service->startWorkerThread(2);

auto connector = brynet::net::AsyncConnector::Create();
Expand Down
2 changes: 1 addition & 1 deletion examples/HttpServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ int main(int argc, char** argv)
}

const auto port = std::atoi(argv[1]);
auto service = TcpService::Create();
auto service = IOThreadTcpService::Create();
service->startWorkerThread(atoi(argv[2]));

auto httpEnterCallback = [](const HTTPParser& httpParser,
Expand Down
2 changes: 1 addition & 1 deletion examples/PingPongClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ int main(int argc, char** argv)

std::string tmp(atoi(argv[5]), 'a');

auto service = TcpService::Create();
auto service = IOThreadTcpService::Create();
service->startWorkerThread(atoi(argv[3]));

auto connector = AsyncConnector::Create();
Expand Down
2 changes: 1 addition & 1 deletion examples/PingPongServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ int main(int argc, char** argv)
exit(-1);
}

auto service = TcpService::Create();
auto service = IOThreadTcpService::Create();
service->startWorkerThread(atoi(argv[2]));

auto enterCallback = [](const TcpConnection::Ptr& session) {
Expand Down
2 changes: 1 addition & 1 deletion examples/PromiseReceive.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ int main(int argc, char** argv)
exit(-1);
}

auto service = TcpService::Create();
auto service = IOThreadTcpService::Create();
service->startWorkerThread(atoi(argv[2]));

auto enterCallback = [](const TcpConnection::Ptr& session) {
Expand Down
85 changes: 85 additions & 0 deletions examples/SingleThreadPingPongServer.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
#include <atomic>
#include <brynet/base/AppStatus.hpp>
#include <brynet/net/EventLoop.hpp>
#include <brynet/net/TcpService.hpp>
#include <brynet/net/wrapper/ServiceBuilder.hpp>
#include <iostream>
#include <mutex>

using namespace brynet;
using namespace brynet::net;

std::atomic_llong TotalRecvSize = ATOMIC_VAR_INIT(0);
std::atomic_llong total_client_num = ATOMIC_VAR_INIT(0);
std::atomic_llong total_packet_num = ATOMIC_VAR_INIT(0);

int main(int argc, char** argv)
{
if (argc != 3)
{
fprintf(stderr, "Usage: <listen port>\n");
exit(-1);
}

auto enterCallback = [](const TcpConnection::Ptr& session) {
total_client_num++;

session->setDataCallback([session](brynet::base::BasePacketReader& reader) {
session->send(reader.begin(), reader.size());
TotalRecvSize += reader.size();
total_packet_num++;
reader.consumeAll();
});

session->setDisConnectCallback([](const TcpConnection::Ptr& session) {
(void) session;
total_client_num--;
});
};

EventLoop::Ptr mainLoop = std::make_shared<EventLoop>();
mainLoop->bindCurrentThread();

auto service = EventLoopTcpService::Create(mainLoop);

wrapper::ListenerBuilder listener;
listener.WithService(service)
.AddSocketProcess({[](TcpSocket& socket) {
socket.setNodelay();
}})
.WithMaxRecvBufferSize(1024)
.AddEnterCallback(enterCallback)
.WithAddr(false, "0.0.0.0", atoi(argv[1]))
.asyncRun();

mainLoop->runIntervalTimer(std::chrono::seconds(1), [&]() {
if (TotalRecvSize / 1024 == 0)
{
std::cout << "total recv : " << TotalRecvSize << " bytes/s, of client num:" << total_client_num << std::endl;
}
else if ((TotalRecvSize / 1024) / 1024 == 0)
{
std::cout << "total recv : " << TotalRecvSize / 1024 << " K/s, of client num:" << total_client_num << std::endl;
}
else
{
std::cout << "total recv : " << (TotalRecvSize / 1024) / 1024 << " M/s, of client num:" << total_client_num << std::endl;
}

std::cout << "packet num:" << total_packet_num << std::endl;
total_packet_num = 0;
TotalRecvSize = 0;
});

while (true)
{
mainLoop->loop(1000);

if (brynet::base::app_kbhit())
{
break;
}
}

return 0;
}
2 changes: 1 addition & 1 deletion examples/WebBinaryProxy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ int main(int argc, char** argv)
string backendIP = argv[2];
int backendPort = atoi(argv[3]);

auto tcpService = TcpService::Create();
auto tcpService = IOThreadTcpService::Create();
tcpService->startWorkerThread(std::thread::hardware_concurrency());

auto asyncConnector = AsyncConnector::Create();
Expand Down
14 changes: 14 additions & 0 deletions include/brynet/base/Timer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,20 @@ class TimerMgr final : public std::enable_shared_from_this<TimerMgr>
}
}

template<typename F, typename... TArgs>
void helperAddIntervalTimer(
RepeatTimer::Ptr repeatTimer,
std::chrono::nanoseconds interval,
F&& callback,
TArgs&&... args)
{
auto sharedThis = shared_from_this();
auto wrapperCallback = std::bind(std::forward<F>(callback), std::forward<TArgs>(args)...);
addTimer(interval, [sharedThis, interval, wrapperCallback, repeatTimer]() {
stubRepeatTimerCallback(sharedThis, interval, wrapperCallback, repeatTimer);
});
}

private:
static void stubRepeatTimerCallback(TimerMgr::Ptr timerMgr,
std::chrono::nanoseconds interval,
Expand Down
31 changes: 31 additions & 0 deletions include/brynet/net/EventLoop.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ class EventLoop : public brynet::base::NonCopyable
reAllocEventSize(1024);
mSelfThreadID = -1;
mTimer = std::make_shared<brynet::base::TimerMgr>();
mSelfThreadIDIsInitialized.store(false);
}

virtual ~EventLoop() BRYNET_NOEXCEPT
Expand All @@ -90,6 +91,11 @@ class EventLoop : public brynet::base::NonCopyable
#endif
}

void bindCurrentThread()
{
tryInitThreadID();
}

void loop(int64_t milliseconds)
{
tryInitThreadID();
Expand Down Expand Up @@ -264,6 +270,10 @@ class EventLoop : public brynet::base::NonCopyable

void runAsyncFunctor(UserFunctor&& f)
{
if (!mSelfThreadIDIsInitialized.load())
{
throw std::runtime_error("thread id not initialized, you need call `bindCurrentThread` first");
}
if (isInLoopThread())
{
f();
Expand All @@ -274,6 +284,7 @@ class EventLoop : public brynet::base::NonCopyable
wakeup();
}
}

void runFunctorAfterLoop(UserFunctor&& f)
{
assert(isInLoopThread());
Expand All @@ -284,6 +295,7 @@ class EventLoop : public brynet::base::NonCopyable

mAfterLoopFunctors.emplace_back(std::move(f));
}

brynet::base::Timer::WeakPtr runAfter(std::chrono::nanoseconds timeout, UserFunctor&& callback)
{
auto timer = std::make_shared<brynet::base::Timer>(
Expand All @@ -306,6 +318,23 @@ class EventLoop : public brynet::base::NonCopyable
return timer;
}

brynet::base::RepeatTimer::Ptr runIntervalTimer(std::chrono::nanoseconds timeout, UserFunctor&& callback)
{
if (isInLoopThread())
{
return mTimer->addIntervalTimer(timeout, callback);
}
else
{
auto timer = std::make_shared<brynet::base::RepeatTimer>();
auto timerMgr = mTimer;
runAsyncFunctor([timerMgr, timer, timeout, callback]() {
timerMgr->helperAddIntervalTimer(timer, timeout, callback);
});
return timer;
}
}

inline bool isInLoopThread() const
{
return mSelfThreadID == current_thread::tid();
Expand Down Expand Up @@ -403,6 +432,7 @@ class EventLoop : public brynet::base::NonCopyable
{
std::call_once(mOnceInitThreadID, [this]() {
mSelfThreadID = current_thread::tid();
mSelfThreadIDIsInitialized.store(true);
});
}

Expand Down Expand Up @@ -433,6 +463,7 @@ class EventLoop : public brynet::base::NonCopyable
std::vector<UserFunctor> mCopyAfterLoopFunctors;

std::once_flag mOnceInitThreadID;
std::atomic_bool mSelfThreadIDIsInitialized;
current_thread::THREAD_ID_TYPE mSelfThreadID;

brynet::base::TimerMgr::Ptr mTimer;
Expand Down
Loading

0 comments on commit 5c07012

Please sign in to comment.