Skip to content

Commit

Permalink
1. fix kill conns logic
Browse files Browse the repository at this point in the history
2. move the position of check closing fd of blpop/brpop
3. clean tcl test's data in pika.yaml
  • Loading branch information
cheniujh committed Aug 10, 2024
1 parent b351d48 commit 3796bee
Show file tree
Hide file tree
Showing 7 changed files with 39 additions and 14 deletions.
6 changes: 3 additions & 3 deletions .github/workflows/pika.yml
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ jobs:

- name: Unit Test
working-directory: ${{ github.workspace }}
run: ./pikatests.sh all
run: ./pikatests.sh all clean

# master on port 9221, slave on port 9231, all with 2 db
- name: Start codis, pika master and pika slave
Expand Down Expand Up @@ -135,7 +135,7 @@ jobs:

- name: Unit Test
working-directory: ${{ github.workspace }}
run: ./pikatests.sh all
run: ./pikatests.sh all clean

- name: Start codis, pika master and pika slave
working-directory: ${{ github.workspace }}/build
Expand Down Expand Up @@ -197,7 +197,7 @@ jobs:
- name: Unit Test
working-directory: ${{ github.workspace }}
run: |
./pikatests.sh all
./pikatests.sh all clean
- name: Start codis, pika master and pika slave
working-directory: ${{ github.workspace }}/build
Expand Down
5 changes: 5 additions & 0 deletions include/pika_command.h
Original file line number Diff line number Diff line change
Expand Up @@ -533,6 +533,11 @@ class Cmd : public std::enable_shared_from_this<Cmd> {
int8_t SubCmdIndex(const std::string& cmdName); // if the command no subCommand,return -1;

void Initial(const PikaCmdArgsType& argv, const std::string& db_name);
//sometimes, some cmd need to be initialed in non-exectute path and being in the scope of some other locks.
//but g_pika_server->GetDB() and g_pika_server->GetSyncMasterDB() all need use big lock (log big mutex)
//to avoid the risk of deadlock, in this scenario, we can GetDB and GetSyncMasterDB before enter into lock scope
//and just pass them as args(thus, no big lock happened within Initial)
void Initial(const PikaCmdArgsType& argv, const std::string& db_name, std::shared_ptr<DB> db, std::shared_ptr<SyncMasterDB> sync_db);
uint32_t flag() const;
bool hasFlag(uint32_t flag) const;
bool is_read() const;
Expand Down
5 changes: 4 additions & 1 deletion include/pika_list.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,10 @@
#ifndef PIKA_LIST_H_
#define PIKA_LIST_H_

#include <memory>
#include "include/acl.h"
#include "include/pika_command.h"
#include "pika_server.h"
#include "storage/storage.h"

/*
Expand Down Expand Up @@ -105,7 +107,8 @@ class BlockingBaseCmd : public Cmd {
void BlockThisClientToWaitLRPush(BlockKeyType block_pop_type, std::vector<std::string>& keys, int64_t expire_time);
void TryToServeBLrPopWithThisKey(const std::string& key, std::shared_ptr<DB> db);
static void ServeAndUnblockConns(void* args);
static void WriteBinlogOfPopAndUpdateCache(std::vector<WriteBinlogOfPopArgs>& pop_args);
static void WriteBinlogOfPopAndUpdateCache(std::vector<WriteBinlogOfPopArgs>& pop_args, std::shared_ptr<DB> db,
std::shared_ptr<SyncMasterDB> sync_db);
void removeDuplicates(std::vector<std::string>& keys_);
// blpop/brpop used functions end
};
Expand Down
2 changes: 1 addition & 1 deletion src/net/src/dispatch_thread.cc
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ void DispatchThread::CleanWaitNodeOfUnBlockedBlrConn(std::shared_ptr<net::RedisC
// removed all the waiting info of this conn/ doing cleaning work
auto pair = blocked_conn_to_keys_.find(conn_unblocked->fd());
if (pair == blocked_conn_to_keys_.end()) {
LOG(WARNING) << "blocking info of blpop/brpop went wrong, blpop/brpop can't working correctly";
LOG(ERROR) << "blocking info of blpop/brpop went wrong, blpop/brpop can't working correctly";
return;
}
auto& blpop_keys_list = pair->second;
Expand Down
7 changes: 4 additions & 3 deletions src/net/src/worker_thread.cc
Original file line number Diff line number Diff line change
Expand Up @@ -200,8 +200,6 @@ void* WorkerThread::ThreadMain() {
}

if (((pfe->mask & kErrorEvent) != 0) || (should_close != 0)) {
//check if this conn disconnected from being blocked by blpop/brpop
dynamic_cast<net::DispatchThread*>(server_thread_)->ClosingConnCheckForBlrPop(std::dynamic_pointer_cast<net::RedisConn>(in_conn));
net_multiplexer_->NetDelEvent(pfe->fd, 0);
CloseFd(in_conn);
in_conn = nullptr;
Expand Down Expand Up @@ -235,7 +233,6 @@ void WorkerThread::DoCronTask() {
}
conns_.clear();
deleting_conn_ipport_.clear();
return;
}

auto iter = conns_.begin();
Expand Down Expand Up @@ -274,9 +271,11 @@ void WorkerThread::DoCronTask() {
}
}
for (const auto& conn : to_close) {
net_multiplexer_->NetDelEvent(conn->fd(), 0);
CloseFd(conn);
}
for (const auto& conn : to_timeout) {
net_multiplexer_->NetDelEvent(conn->fd(), 0);
CloseFd(conn);
server_thread_->handle_->FdTimeoutHandle(conn->fd(), conn->ip_port());
}
Expand All @@ -302,6 +301,8 @@ bool WorkerThread::TryKillConn(const std::string& ip_port) {
}

void WorkerThread::CloseFd(const std::shared_ptr<NetConn>& conn) {
//check if this conn disconnected from being blocked by blpop/brpop
dynamic_cast<net::DispatchThread*>(server_thread_)->ClosingConnCheckForBlrPop(std::dynamic_pointer_cast<net::RedisConn>(conn));
close(conn->fd());
if (auto dispatcher = dynamic_cast<DispatchThread *>(server_thread_); dispatcher != nullptr ) {
dispatcher->RemoveWatchKeys(conn);
Expand Down
11 changes: 11 additions & 0 deletions src/pika_command.cc
Original file line number Diff line number Diff line change
Expand Up @@ -841,6 +841,17 @@ void Cmd::Initial(const PikaCmdArgsType& argv, const std::string& db_name) {
DoInitial();
};

void Cmd::Initial(const PikaCmdArgsType& argv, const std::string& db_name, std::shared_ptr<DB> db,
std::shared_ptr<SyncMasterDB> sync_db) {
argv_ = argv;
db_name_ = db_name;
res_.clear(); // Clear res content
db_ = std::move(db);
sync_db_ = std::move(sync_db);
Clear(); // Clear cmd, Derived class can has own implement
DoInitial();
}

std::vector<std::string> Cmd::current_key() const { return {""}; }

void Cmd::Execute() {
Expand Down
17 changes: 11 additions & 6 deletions src/pika_list.cc
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
// of patent rights can be found in the PATENTS file in the same directory.

#include "include/pika_list.h"
#include <memory>
#include <utility>
#include "include/pika_cache.h"
#include "include/pika_data_distribution.h"
Expand Down Expand Up @@ -179,10 +180,13 @@ void BlockingBaseCmd::ServeAndUnblockConns(void* args) {
auto bg_args = std::unique_ptr<UnblockTaskArgs>(static_cast<UnblockTaskArgs*>(args));
net::DispatchThread* dispatchThread = bg_args->dispatchThread;
std::shared_ptr<DB> db = bg_args->db;
std::shared_ptr<SyncMasterDB> sync_db = g_pika_rm->GetSyncMasterDBByName(DBInfo(db->GetDBName()));
std::string key = std::move(bg_args->key);
auto& key_to_conns_ = dispatchThread->GetMapFromKeyToConns();
net::BlockKey blrPop_key{db->GetDBName(), key};

//[NOTICE]: within the next scope formed by 3 locks, other locks should never be acquired !
//WARNING: g_pika_server->GetDB(), g_pika_rm->GetSynMasterDB SHOULD NOT BE CALLED within the scope, or deadlock will happen !
pstd::lock::ScopeRecordLock record_lock(db->LockMgr(), key);//It's a RAII Lock
db->DBLockShared();
std::unique_lock map_lock(dispatchThread->GetBlockMtx());// do not change the sequence of these 3 locks, or deadlock will happen
Expand Down Expand Up @@ -224,11 +228,13 @@ void BlockingBaseCmd::ServeAndUnblockConns(void* args) {
}
dispatchThread->CleanKeysAfterWaitNodeCleaned();
map_lock.unlock();
WriteBinlogOfPopAndUpdateCache(pop_binlog_args);
WriteBinlogOfPopAndUpdateCache(pop_binlog_args, db, sync_db);
db->DBUnlockShared();
}

void BlockingBaseCmd::WriteBinlogOfPopAndUpdateCache(std::vector<WriteBinlogOfPopArgs>& pop_args) {
void BlockingBaseCmd::WriteBinlogOfPopAndUpdateCache(std::vector<WriteBinlogOfPopArgs>& pop_args,
std::shared_ptr<DB> db,
std::shared_ptr<SyncMasterDB> sync_db) {
// write binlog of l/rpop
for (auto& pop_arg : pop_args) {
std::shared_ptr<Cmd> pop_cmd;
Expand All @@ -240,11 +246,10 @@ void BlockingBaseCmd::WriteBinlogOfPopAndUpdateCache(std::vector<WriteBinlogOfPo
pop_type = kCmdNameRPop;
pop_cmd = std::make_shared<RPopCmd>(kCmdNameRPop, 2, kCmdFlagsWrite | kCmdFlagsList);
}

PikaCmdArgsType args;
args.push_back(std::move(pop_type));
args.push_back(pop_arg.key);
pop_cmd->Initial(args, pop_arg.db->GetDBName());
pop_cmd->Initial(args, pop_arg.db->GetDBName(), std::move(db), std::move(sync_db));
pop_cmd->SetConn(pop_arg.conn);
auto resp_ptr = std::make_shared<std::string>("this resp won't be used for current code(consensus-level always be 0)");
pop_cmd->SetResp(resp_ptr);
Expand Down Expand Up @@ -398,7 +403,7 @@ void BLPopCmd::DoBinlog() {
}
std::vector<WriteBinlogOfPopArgs> args;
args.push_back(std::move(binlog_args_));
WriteBinlogOfPopAndUpdateCache(args);
WriteBinlogOfPopAndUpdateCache(args, db_, sync_db_);
}

void LPopCmd::DoInitial() {
Expand Down Expand Up @@ -732,7 +737,7 @@ void BRPopCmd::DoBinlog() {
}
std::vector<WriteBinlogOfPopArgs> args;
args.push_back(std::move(binlog_args_));
WriteBinlogOfPopAndUpdateCache(args);
WriteBinlogOfPopAndUpdateCache(args, db_, sync_db_);
}


Expand Down

0 comments on commit 3796bee

Please sign in to comment.