Skip to content

Commit

Permalink
fix: kill connections correctly in Net WorkThread (#2862)
Browse files Browse the repository at this point in the history
* kill conns correct in WorkThread
  • Loading branch information
cheniujh authored and brother-jin committed Aug 13, 2024
1 parent 0c2e083 commit 88f8d70
Show file tree
Hide file tree
Showing 4 changed files with 63 additions and 13 deletions.
48 changes: 41 additions & 7 deletions src/net/src/worker_thread.cc
Original file line number Diff line number Diff line change
Expand Up @@ -270,17 +270,55 @@ void WorkerThread::DoCronTask() {
++iter;
}
}
/*
* How Do we kill a conn correct:
* stage 1: stop accept new request(also give up the write back of shooting request's response)
* 1.1 remove the fd from epoll and erase it from conns_ to ensure no more request will submit to threadpool
* 1.2 add to-close-conn to wait_to_close_conns_
* stage 2: ensure there's no other shared_ptr of this conn in pika
* 2.1 in async task that exec by TheadPool, a shared_ptr of conn will hold and my case a pipe event to tell the epoll
* to back the response, we must ensure this notification is done before we really close fd(linux will reuse the fd to accept new conn)
* 2.2 we must clear all other shared_ptr of this to-close-conn, like the map of blpop/brpop and the map of watchkeys
* 2.3 for those to-close-conns that ref count drop to 1, we add them to ready-to-close-conns_
* stage 3: after an epoll cycle(let it handle the already-invalid-writeback-notification ), we can safely close the fds of ready_to_close_conns_
*/

for (auto& conn : ready_to_close_conns_) {
close(conn->fd());
server_thread_->handle_->FdClosedHandle(conn->fd(), conn->ip_port());
}
ready_to_close_conns_.clear();

for (auto conn = wait_to_close_conns_.begin(); conn != wait_to_close_conns_.end();) {
if (conn->use_count() == 1) {
ready_to_close_conns_.push_back(*conn);
conn = wait_to_close_conns_.erase(conn);
} else {
++conn;
}
}

for (const auto& conn : to_close) {
net_multiplexer_->NetDelEvent(conn->fd(), 0);
CloseFd(conn);
ClearConnsRefAndOtherInfo(conn);
wait_to_close_conns_.push_back(conn);
}
for (const auto& conn : to_timeout) {
net_multiplexer_->NetDelEvent(conn->fd(), 0);
CloseFd(conn);
ClearConnsRefAndOtherInfo(conn);
wait_to_close_conns_.push_back(conn);
server_thread_->handle_->FdTimeoutHandle(conn->fd(), conn->ip_port());
}
}

void WorkerThread::ClearConnsRefAndOtherInfo(const std::shared_ptr<NetConn>& conn) {
if (auto dispatcher = dynamic_cast<DispatchThread *>(server_thread_); dispatcher != nullptr ) {
//check if this conn disconnected from being blocked by blpop/brpop
dispatcher->ClosingConnCheckForBlrPop(std::dynamic_pointer_cast<net::RedisConn>(conn));
dispatcher->RemoveWatchKeys(conn);
}
}

bool WorkerThread::TryKillConn(const std::string& ip_port) {
bool find = false;
if (ip_port != kKillAllConnsTask) {
Expand All @@ -301,12 +339,8 @@ bool WorkerThread::TryKillConn(const std::string& ip_port) {
}

void WorkerThread::CloseFd(const std::shared_ptr<NetConn>& conn) {
ClearConnsRefAndOtherInfo(conn);
close(conn->fd());
if (auto dispatcher = dynamic_cast<DispatchThread *>(server_thread_); dispatcher != nullptr ) {
//check if this conn disconnected from being blocked by blpop/brpop
dispatcher->ClosingConnCheckForBlrPop(std::dynamic_pointer_cast<net::RedisConn>(conn));
dispatcher->RemoveWatchKeys(conn);
}
server_thread_->handle_->FdClosedHandle(conn->fd(), conn->ip_port());
}

Expand Down
5 changes: 5 additions & 0 deletions src/net/src/worker_thread.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,15 @@ class WorkerThread : public Thread {
NetMultiplexer* net_multiplexer() { return net_multiplexer_.get(); }
bool TryKillConn(const std::string& ip_port);

void ClearConnsRefAndOtherInfo(const std::shared_ptr<NetConn>& conn);

ServerThread* GetServerThread() { return server_thread_; }

mutable pstd::RWMutex rwlock_; /* For external statistics */
std::map<int, std::shared_ptr<NetConn>> conns_;
std::vector<std::shared_ptr<NetConn>> wait_to_close_conns_;
std::vector<std::shared_ptr<NetConn>> ready_to_close_conns_;


void* private_data_ = nullptr;

Expand Down
15 changes: 13 additions & 2 deletions tests/integration/replication_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -637,14 +637,25 @@ var _ = Describe("should replication ", func() {

for i := 1; i <= 5; i++ {
go func() {
clientMaster.BLPop(ctx, 0, lists...)
client := redis.NewClient(PikaOption(MASTERADDR))
defer client.Close()

client.BLPop(ctx, 0, lists...)
}()
go func() {
clientMaster.BRPop(ctx, 0, lists...)
client := redis.NewClient(PikaOption(MASTERADDR))
defer client.Close()

client.BRPop(ctx, 0, lists...)
}()
}
execute(&ctx, clientMaster, 5, issuePushPopFrequency)


time.Sleep(3 * time.Second);
//reconnect to avoid timeout-kill
clientSlave := redis.NewClient(PikaOption(SLAVEADDR))
// Fail("Stopping the test due to some condition");
for i := int64(0); i < clientMaster.LLen(ctx, "blist0").Val(); i++ {
Expect(clientMaster.LIndex(ctx, "blist0", i)).To(Equal(clientSlave.LIndex(ctx, "blist0", i)))
}
Expand Down
8 changes: 4 additions & 4 deletions tests/integration/rsync_dynamic_reconfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,10 @@ func RefillMaster(masterAddr string, dataVolumeMB int64, ctx context.Context) {
cli.Set(ctx, rKey, rValue, 0)
}
}
keySize := 1024
valueSize := 1024
keySize := 64
valueSize := 64
dataVolumeBytes := dataVolumeMB << 20
threadNum := 10
threadNum := 5
reqNumForEachThead := dataVolumeBytes / int64((keySize + valueSize)) / int64(threadNum)
//fmt.Printf("reqNumForEach:%d\n", reqNumForEachThead)
startTime := time.Now()
Expand Down Expand Up @@ -136,7 +136,7 @@ var _ = Describe("Rsync Reconfig Test", func() {
slave1.FlushDB(ctx)
master1.FlushDB(ctx)
time.Sleep(3 * time.Second)
RefillMaster(MASTERADDR, 64, ctx)
RefillMaster(MASTERADDR, 2, ctx)
key1 := "45vs45f4s5d6"
value1 := "afd54g5s4f545"
//set key before sync happened, slave is supposed to fetch it when sync done
Expand Down

0 comments on commit 88f8d70

Please sign in to comment.