Skip to content

Commit

Permalink
fix deadlock that cause by DB::dbs_rw_ and PikaReplicationManager::db…
Browse files Browse the repository at this point in the history
…s_rw_
  • Loading branch information
cheniujh committed Aug 9, 2024
1 parent 46ac102 commit 2919b93
Show file tree
Hide file tree
Showing 4 changed files with 31 additions and 7 deletions.
5 changes: 5 additions & 0 deletions include/pika_command.h
Original file line number Diff line number Diff line change
Expand Up @@ -533,6 +533,11 @@ class Cmd : public std::enable_shared_from_this<Cmd> {
int8_t SubCmdIndex(const std::string& cmdName); // if the command no subCommand,return -1;

void Initial(const PikaCmdArgsType& argv, const std::string& db_name);
//sometimes, some cmd need to be initialed in non-exectute path and being in the scope of some other locks.
//but g_pika_server->GetDB() and g_pika_server->GetSyncMasterDB() all need use big lock (log big mutex)
//to avoid the risk of deadlock, in this scenario, we can GetDB and GetSyncMasterDB before enter into lock scope
//and just pass them as args(thus, no big lock happened within Initial)
void Initial(const PikaCmdArgsType& argv, const std::string& db_name, std::shared_ptr<DB> db, std::shared_ptr<SyncMasterDB> sync_db);
uint32_t flag() const;
bool hasFlag(uint32_t flag) const;
bool is_read() const;
Expand Down
5 changes: 4 additions & 1 deletion include/pika_list.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,10 @@
#ifndef PIKA_LIST_H_
#define PIKA_LIST_H_

#include <memory>
#include "include/acl.h"
#include "include/pika_command.h"
#include "pika_server.h"
#include "storage/storage.h"

/*
Expand Down Expand Up @@ -105,7 +107,8 @@ class BlockingBaseCmd : public Cmd {
void BlockThisClientToWaitLRPush(BlockKeyType block_pop_type, std::vector<std::string>& keys, int64_t expire_time);
void TryToServeBLrPopWithThisKey(const std::string& key, std::shared_ptr<DB> db);
static void ServeAndUnblockConns(void* args);
static void WriteBinlogOfPopAndUpdateCache(std::vector<WriteBinlogOfPopArgs>& pop_args);
static void WriteBinlogOfPopAndUpdateCache(std::vector<WriteBinlogOfPopArgs>& pop_args, std::shared_ptr<DB> db,
std::shared_ptr<SyncMasterDB> sync_db);
void removeDuplicates(std::vector<std::string>& keys_);
// blpop/brpop used functions end
};
Expand Down
11 changes: 11 additions & 0 deletions src/pika_command.cc
Original file line number Diff line number Diff line change
Expand Up @@ -841,6 +841,17 @@ void Cmd::Initial(const PikaCmdArgsType& argv, const std::string& db_name) {
DoInitial();
};

void Cmd::Initial(const PikaCmdArgsType& argv, const std::string& db_name, std::shared_ptr<DB> db,
std::shared_ptr<SyncMasterDB> sync_db) {
argv_ = argv;
db_name_ = db_name;
res_.clear(); // Clear res content
db_ = std::move(db);
sync_db_ = std::move(sync_db);
Clear(); // Clear cmd, Derived class can has own implement
DoInitial();
}

std::vector<std::string> Cmd::current_key() const { return {""}; }

void Cmd::Execute() {
Expand Down
17 changes: 11 additions & 6 deletions src/pika_list.cc
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
// of patent rights can be found in the PATENTS file in the same directory.

#include "include/pika_list.h"
#include <memory>
#include <utility>
#include "include/pika_cache.h"
#include "include/pika_data_distribution.h"
Expand Down Expand Up @@ -179,10 +180,13 @@ void BlockingBaseCmd::ServeAndUnblockConns(void* args) {
auto bg_args = std::unique_ptr<UnblockTaskArgs>(static_cast<UnblockTaskArgs*>(args));
net::DispatchThread* dispatchThread = bg_args->dispatchThread;
std::shared_ptr<DB> db = bg_args->db;
std::shared_ptr<SyncMasterDB> sync_db = g_pika_rm->GetSyncMasterDBByName(DBInfo(db->GetDBName()));
std::string key = std::move(bg_args->key);
auto& key_to_conns_ = dispatchThread->GetMapFromKeyToConns();
net::BlockKey blrPop_key{db->GetDBName(), key};

//[NOTICE]: within the next scope formed by 3 locks, other locks should never be acquired !
//WARNING: g_pika_server->GetDB(), g_pika_rm->GetSynMasterDB SHOULD NOT BE CALLED within the scope, or deadlock will happen !
pstd::lock::ScopeRecordLock record_lock(db->LockMgr(), key);//It's a RAII Lock
db->DBLockShared();
std::unique_lock map_lock(dispatchThread->GetBlockMtx());// do not change the sequence of these 3 locks, or deadlock will happen
Expand Down Expand Up @@ -224,11 +228,13 @@ void BlockingBaseCmd::ServeAndUnblockConns(void* args) {
}
dispatchThread->CleanKeysAfterWaitNodeCleaned();
map_lock.unlock();
WriteBinlogOfPopAndUpdateCache(pop_binlog_args);
WriteBinlogOfPopAndUpdateCache(pop_binlog_args, db, sync_db);
db->DBUnlockShared();
}

void BlockingBaseCmd::WriteBinlogOfPopAndUpdateCache(std::vector<WriteBinlogOfPopArgs>& pop_args) {
void BlockingBaseCmd::WriteBinlogOfPopAndUpdateCache(std::vector<WriteBinlogOfPopArgs>& pop_args,
std::shared_ptr<DB> db,
std::shared_ptr<SyncMasterDB> sync_db) {
// write binlog of l/rpop
for (auto& pop_arg : pop_args) {
std::shared_ptr<Cmd> pop_cmd;
Expand All @@ -240,11 +246,10 @@ void BlockingBaseCmd::WriteBinlogOfPopAndUpdateCache(std::vector<WriteBinlogOfPo
pop_type = kCmdNameRPop;
pop_cmd = std::make_shared<RPopCmd>(kCmdNameRPop, 2, kCmdFlagsWrite | kCmdFlagsList);
}

PikaCmdArgsType args;
args.push_back(std::move(pop_type));
args.push_back(pop_arg.key);
pop_cmd->Initial(args, pop_arg.db->GetDBName());
pop_cmd->Initial(args, pop_arg.db->GetDBName(), std::move(db), std::move(sync_db));
pop_cmd->SetConn(pop_arg.conn);
auto resp_ptr = std::make_shared<std::string>("this resp won't be used for current code(consensus-level always be 0)");
pop_cmd->SetResp(resp_ptr);
Expand Down Expand Up @@ -398,7 +403,7 @@ void BLPopCmd::DoBinlog() {
}
std::vector<WriteBinlogOfPopArgs> args;
args.push_back(std::move(binlog_args_));
WriteBinlogOfPopAndUpdateCache(args);
WriteBinlogOfPopAndUpdateCache(args, db_, sync_db_);
}

void LPopCmd::DoInitial() {
Expand Down Expand Up @@ -732,7 +737,7 @@ void BRPopCmd::DoBinlog() {
}
std::vector<WriteBinlogOfPopArgs> args;
args.push_back(std::move(binlog_args_));
WriteBinlogOfPopAndUpdateCache(args);
WriteBinlogOfPopAndUpdateCache(args, db_, sync_db_);
}


Expand Down

0 comments on commit 2919b93

Please sign in to comment.