Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: kill connections correctly in Net WorkThread #2862

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 40 additions & 7 deletions src/net/src/worker_thread.cc
Original file line number Diff line number Diff line change
Expand Up @@ -270,17 +270,54 @@ void WorkerThread::DoCronTask() {
++iter;
}
}
/*
* How Do we kill a conn correct:
* stage 1: stop accept new request(also give up the write back of shooting request's response)
* 1.1 remove the fd from epoll and erase it from conns_ to ensure no more request will submit to threadpool
* 1.2 add to-close-conn to wait_to_close_conns_
* stage 2: ensure there's no other shared_ptr of this conn in pika
* 2.1 in async task that exec by TheadPool, a shared_ptr of conn will hold and my case a pipe event to tell the epoll
* to back the response, we must ensure this notification is done before we really close fd(linux will reuse the fd to accept new conn)
* 2.2 we must clear all other shared_ptr of this to-close-conn, like the map of blpop/brpop and the map of watchkeys
* 2.3 for those to-close-conns that ref count drop to 1, we add them to ready-to-close-conns_
* stage 3: after an epoll cycle(let it handle the already-invalid-writeback-notification ), we can safely close the fds of ready_to_close_conns_
*/

for (auto& conn : ready_to_close_conns_) {
close(conn->fd());
}
ready_to_close_conns_.clear();

for (auto conn = wait_to_close_conns_.begin(); conn != wait_to_close_conns_.end();) {
if (conn->use_count() == 1) {
ready_to_close_conns_.push_back(*conn);
conn = wait_to_close_conns_.erase(conn);
} else {
++conn;
}
}

for (const auto& conn : to_close) {
net_multiplexer_->NetDelEvent(conn->fd(), 0);
CloseFd(conn);
ClearConnsRefAndOtherInfo(conn);
wait_to_close_conns_.push_back(conn);
}
for (const auto& conn : to_timeout) {
net_multiplexer_->NetDelEvent(conn->fd(), 0);
CloseFd(conn);
ClearConnsRefAndOtherInfo(conn);
wait_to_close_conns_.push_back(conn);
cheniujh marked this conversation as resolved.
Show resolved Hide resolved
server_thread_->handle_->FdTimeoutHandle(conn->fd(), conn->ip_port());
}
}

void WorkerThread::ClearConnsRefAndOtherInfo(const std::shared_ptr<NetConn>& conn) {
if (auto dispatcher = dynamic_cast<DispatchThread *>(server_thread_); dispatcher != nullptr ) {
//check if this conn disconnected from being blocked by blpop/brpop
dispatcher->ClosingConnCheckForBlrPop(std::dynamic_pointer_cast<net::RedisConn>(conn));
dispatcher->RemoveWatchKeys(conn);
}
}

bool WorkerThread::TryKillConn(const std::string& ip_port) {
bool find = false;
if (ip_port != kKillAllConnsTask) {
Expand All @@ -301,12 +338,8 @@ bool WorkerThread::TryKillConn(const std::string& ip_port) {
}

void WorkerThread::CloseFd(const std::shared_ptr<NetConn>& conn) {
ClearConnsRefAndOtherInfo(conn);
close(conn->fd());
if (auto dispatcher = dynamic_cast<DispatchThread *>(server_thread_); dispatcher != nullptr ) {
//check if this conn disconnected from being blocked by blpop/brpop
dispatcher->ClosingConnCheckForBlrPop(std::dynamic_pointer_cast<net::RedisConn>(conn));
dispatcher->RemoveWatchKeys(conn);
}
server_thread_->handle_->FdClosedHandle(conn->fd(), conn->ip_port());
}

Expand Down
5 changes: 5 additions & 0 deletions src/net/src/worker_thread.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,15 @@ class WorkerThread : public Thread {
NetMultiplexer* net_multiplexer() { return net_multiplexer_.get(); }
bool TryKillConn(const std::string& ip_port);

void ClearConnsRefAndOtherInfo(const std::shared_ptr<NetConn>& conn);

ServerThread* GetServerThread() { return server_thread_; }

mutable pstd::RWMutex rwlock_; /* For external statistics */
std::map<int, std::shared_ptr<NetConn>> conns_;
std::vector<std::shared_ptr<NetConn>> wait_to_close_conns_;
std::vector<std::shared_ptr<NetConn>> ready_to_close_conns_;


void* private_data_ = nullptr;

Expand Down
Loading