Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: kill connections correctly in Net WorkThread #2862

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 41 additions & 7 deletions src/net/src/worker_thread.cc
Original file line number Diff line number Diff line change
Expand Up @@ -270,17 +270,55 @@ void WorkerThread::DoCronTask() {
++iter;
}
}
/*
* How Do we kill a conn correct:
* stage 1: stop accept new request(also give up the write back of shooting request's response)
* 1.1 remove the fd from epoll and erase it from conns_ to ensure no more request will submit to threadpool
* 1.2 add to-close-conn to wait_to_close_conns_
* stage 2: ensure there's no other shared_ptr of this conn in pika
* 2.1 in async task that exec by TheadPool, a shared_ptr of conn will hold and my case a pipe event to tell the epoll
* to back the response, we must ensure this notification is done before we really close fd(linux will reuse the fd to accept new conn)
* 2.2 we must clear all other shared_ptr of this to-close-conn, like the map of blpop/brpop and the map of watchkeys
* 2.3 for those to-close-conns that ref count drop to 1, we add them to ready-to-close-conns_
* stage 3: after an epoll cycle(let it handle the already-invalid-writeback-notification ), we can safely close the fds of ready_to_close_conns_
*/

for (auto& conn : ready_to_close_conns_) {
close(conn->fd());
server_thread_->handle_->FdClosedHandle(conn->fd(), conn->ip_port());
}
ready_to_close_conns_.clear();

for (auto conn = wait_to_close_conns_.begin(); conn != wait_to_close_conns_.end();) {
if (conn->use_count() == 1) {
ready_to_close_conns_.push_back(*conn);
conn = wait_to_close_conns_.erase(conn);
} else {
++conn;
}
}

for (const auto& conn : to_close) {
net_multiplexer_->NetDelEvent(conn->fd(), 0);
CloseFd(conn);
ClearConnsRefAndOtherInfo(conn);
wait_to_close_conns_.push_back(conn);
}
for (const auto& conn : to_timeout) {
net_multiplexer_->NetDelEvent(conn->fd(), 0);
CloseFd(conn);
ClearConnsRefAndOtherInfo(conn);
wait_to_close_conns_.push_back(conn);
cheniujh marked this conversation as resolved.
Show resolved Hide resolved
server_thread_->handle_->FdTimeoutHandle(conn->fd(), conn->ip_port());
}
}

void WorkerThread::ClearConnsRefAndOtherInfo(const std::shared_ptr<NetConn>& conn) {
if (auto dispatcher = dynamic_cast<DispatchThread *>(server_thread_); dispatcher != nullptr ) {
//check if this conn disconnected from being blocked by blpop/brpop
dispatcher->ClosingConnCheckForBlrPop(std::dynamic_pointer_cast<net::RedisConn>(conn));
dispatcher->RemoveWatchKeys(conn);
}
}

bool WorkerThread::TryKillConn(const std::string& ip_port) {
bool find = false;
if (ip_port != kKillAllConnsTask) {
Expand All @@ -301,12 +339,8 @@ bool WorkerThread::TryKillConn(const std::string& ip_port) {
}

void WorkerThread::CloseFd(const std::shared_ptr<NetConn>& conn) {
ClearConnsRefAndOtherInfo(conn);
close(conn->fd());
if (auto dispatcher = dynamic_cast<DispatchThread *>(server_thread_); dispatcher != nullptr ) {
//check if this conn disconnected from being blocked by blpop/brpop
dispatcher->ClosingConnCheckForBlrPop(std::dynamic_pointer_cast<net::RedisConn>(conn));
dispatcher->RemoveWatchKeys(conn);
}
server_thread_->handle_->FdClosedHandle(conn->fd(), conn->ip_port());
}

Expand Down
5 changes: 5 additions & 0 deletions src/net/src/worker_thread.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,15 @@ class WorkerThread : public Thread {
NetMultiplexer* net_multiplexer() { return net_multiplexer_.get(); }
bool TryKillConn(const std::string& ip_port);

void ClearConnsRefAndOtherInfo(const std::shared_ptr<NetConn>& conn);

ServerThread* GetServerThread() { return server_thread_; }

mutable pstd::RWMutex rwlock_; /* For external statistics */
std::map<int, std::shared_ptr<NetConn>> conns_;
std::vector<std::shared_ptr<NetConn>> wait_to_close_conns_;
std::vector<std::shared_ptr<NetConn>> ready_to_close_conns_;


void* private_data_ = nullptr;

Expand Down
15 changes: 13 additions & 2 deletions tests/integration/replication_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -634,14 +634,25 @@ var _ = Describe("should replication ", func() {

for i := 1; i <= 5; i++ {
go func() {
clientMaster.BLPop(ctx, 0, lists...)
client := redis.NewClient(PikaOption(MASTERADDR))
defer client.Close()

client.BLPop(ctx, 0, lists...)
}()
go func() {
clientMaster.BRPop(ctx, 0, lists...)
client := redis.NewClient(PikaOption(MASTERADDR))
defer client.Close()

client.BRPop(ctx, 0, lists...)
}()
}
execute(&ctx, clientMaster, 5, issuePushPopFrequency)


time.Sleep(3 * time.Second);
//reconnect to avoid timeout-kill
clientSlave := redis.NewClient(PikaOption(SLAVEADDR))
// Fail("Stopping the test due to some condition");
for i := int64(0); i < clientMaster.LLen(ctx, "blist0").Val(); i++ {
Expect(clientMaster.LIndex(ctx, "blist0", i)).To(Equal(clientSlave.LIndex(ctx, "blist0", i)))
}
Expand Down
8 changes: 4 additions & 4 deletions tests/integration/rsync_dynamic_reconfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,10 @@ func RefillMaster(masterAddr string, dataVolumeMB int64, ctx context.Context) {
cli.Set(ctx, rKey, rValue, 0)
}
}
keySize := 1024
valueSize := 1024
keySize := 64
valueSize := 64
dataVolumeBytes := dataVolumeMB << 20
threadNum := 10
threadNum := 5
reqNumForEachThead := dataVolumeBytes / int64((keySize + valueSize)) / int64(threadNum)
//fmt.Printf("reqNumForEach:%d\n", reqNumForEachThead)
startTime := time.Now()
Expand Down Expand Up @@ -136,7 +136,7 @@ var _ = Describe("Rsync Reconfig Test", func() {
slave1.FlushDB(ctx)
master1.FlushDB(ctx)
time.Sleep(3 * time.Second)
RefillMaster(MASTERADDR, 64, ctx)
RefillMaster(MASTERADDR, 2, ctx)
key1 := "45vs45f4s5d6"
value1 := "afd54g5s4f545"
//set key before sync happened, slave is supposed to fetch it when sync done
Expand Down
21 changes: 14 additions & 7 deletions tests/integration/start_master_and_slave.sh
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@ mkdir slave_data
# Example Change the location for storing data on primary and secondary nodes in the configuration file
sed -i.bak \
-e 's|databases : 1|databases : 2|' \
-e 's|#daemonize : yes|daemonize : yes|' ./pika_single.conf
-e 's|#daemonize : yes|daemonize : yes|' \
-e 's|timeout : 60|timeout : 500|' ./pika_single.conf

sed -i.bak \
-e 's|databases : 1|databases : 2|' \
Expand All @@ -24,7 +25,8 @@ sed -i.bak \
-e 's|dump-path : ./dump/|dump-path : ./master_data/dump/|' \
-e 's|pidfile : ./pika.pid|pidfile : ./master_data/pika.pid|' \
-e 's|db-sync-path : ./dbsync/|db-sync-path : ./master_data/dbsync/|' \
-e 's|#daemonize : yes|daemonize : yes|' ./pika_master.conf
-e 's|#daemonize : yes|daemonize : yes|' \
-e 's|timeout : 60|timeout : 500|' ./pika_master.conf

sed -i.bak \
-e 's|databases : 1|databases : 2|' \
Expand All @@ -34,7 +36,8 @@ sed -i.bak \
-e 's|dump-path : ./dump/|dump-path : ./slave_data/dump/|' \
-e 's|pidfile : ./pika.pid|pidfile : ./slave_data/pika.pid|' \
-e 's|db-sync-path : ./dbsync/|db-sync-path : ./slave_data/dbsync/|' \
-e 's|#daemonize : yes|daemonize : yes|' ./pika_slave.conf
-e 's|#daemonize : yes|daemonize : yes|' \
-e 's|timeout : 60|timeout : 500|' ./pika_slave.conf

sed -i.bak \
-e 's|# rename-command : FLUSHALL 360flushall|rename-command : FLUSHALL 360flushall|' \
Expand All @@ -46,7 +49,8 @@ sed -i.bak \
-e 's|dump-path : ./dump/|dump-path : ./rename_data/dump/|' \
-e 's|pidfile : ./pika.pid|pidfile : ./rename_data/pika.pid|' \
-e 's|db-sync-path : ./dbsync/|db-sync-path : ./rename_data/dbsync/|' \
-e 's|#daemonize : yes|daemonize : yes|' ./pika_rename.conf
-e 's|#daemonize : yes|daemonize : yes|' \
-e 's|timeout : 60|timeout : 500|' ./pika_rename.conf

sed -i.bak \
-e 's|requirepass :|requirepass : requirepass|' \
Expand All @@ -59,7 +63,8 @@ sed -i.bak \
-e 's|dump-path : ./dump/|dump-path : ./acl1_data/dump/|' \
-e 's|pidfile : ./pika.pid|pidfile : ./acl1_data/pika.pid|' \
-e 's|db-sync-path : ./dbsync/|db-sync-path : ./acl1_data/dbsync/|' \
-e 's|#daemonize : yes|daemonize : yes|' ./pika_acl_both_password.conf
-e 's|#daemonize : yes|daemonize : yes|' \
-e 's|timeout : 60|timeout : 500|' ./pika_acl_both_password.conf

sed -i.bak \
-e 's|requirepass :|requirepass : requirepass|' \
Expand All @@ -71,7 +76,8 @@ sed -i.bak \
-e 's|dump-path : ./dump/|dump-path : ./acl2_data/dump/|' \
-e 's|pidfile : ./pika.pid|pidfile : ./acl2_data/pika.pid|' \
-e 's|db-sync-path : ./dbsync/|db-sync-path : ./acl2_data/dbsync/|' \
-e 's|#daemonize : yes|daemonize : yes|' ./pika_acl_only_admin_password.conf
-e 's|#daemonize : yes|daemonize : yes|' \
-e 's|timeout : 60|timeout : 500|' ./pika_acl_only_admin_password.conf
sed -i.bak \
-e 's|requirepass :|requirepass : requirepass|' \
-e 's|masterauth :|masterauth : requirepass|' \
Expand All @@ -83,7 +89,8 @@ sed -i.bak \
-e 's|dump-path : ./dump/|dump-path : ./acl3_data/dump/|' \
-e 's|pidfile : ./pika.pid|pidfile : ./acl3_data/pika.pid|' \
-e 's|db-sync-path : ./dbsync/|db-sync-path : ./acl3_data/dbsync/|' \
-e 's|#daemonize : yes|daemonize : yes|' ./pika_has_other_acl_user.conf
-e 's|#daemonize : yes|daemonize : yes|' \
-e 's|timeout : 60|timeout : 500|' ./pika_has_other_acl_user.conf
echo -e '\nuser : limit on >limitpass ~* +@all &*' >> ./pika_has_other_acl_user.conf

# Start three nodes
Expand Down
Loading