Skip to content

Commit

Permalink
fix: flushdb and flushall bug (#2533)
Browse files Browse the repository at this point in the history
* fix flushdb bug
* Updated flushall write Binlog logic
* delete flushsubdb
* add flushall cache clear

---------
  • Loading branch information
Mixficsol authored and brother-jin committed Apr 8, 2024
1 parent d93dd02 commit bf5e97c
Show file tree
Hide file tree
Showing 8 changed files with 45 additions and 202 deletions.
8 changes: 1 addition & 7 deletions include/pika_admin.h
Original file line number Diff line number Diff line change
Expand Up @@ -185,17 +185,14 @@ class FlushallCmd : public Cmd {
: Cmd(name, arity, flag, static_cast<uint32_t>(AclCategory::KEYSPACE)) {}
void Do() override;
void DoThroughDB() override;
void DoUpdateCache() override;
void DoUpdateCache(std::shared_ptr<DB> db);
void Split(const HintKeys& hint_keys) override{};
void Merge() override{};
Cmd* Clone() override { return new FlushallCmd(*this); }
void Execute() override;
void FlushAllWithoutLock();
void DoBinlog(std::shared_ptr<SyncMasterDB> sync_db_);

private:
void DoInitial() override;
std::string ToRedisProtocol() override;
void DoWithoutLock(std::shared_ptr<DB> db);
};

Expand All @@ -212,7 +209,6 @@ class FlushdbCmd : public Cmd {
void Merge() override{};
Cmd* Clone() override { return new FlushdbCmd(*this); }
void FlushAllDBsWithoutLock();
void Execute() override;
std::string GetFlushDBname() { return db_name_; }

private:
Expand Down Expand Up @@ -265,7 +261,6 @@ class InfoCmd : public Cmd {
void Split(const HintKeys& hint_keys) override {};
void Merge() override {};
Cmd* Clone() override { return new InfoCmd(*this); }
void Execute() override;

private:
InfoSection info_section_;
Expand Down Expand Up @@ -333,7 +328,6 @@ class ConfigCmd : public Cmd {
void Split(const HintKeys& hint_keys) override {};
void Merge() override {};
Cmd* Clone() override { return new ConfigCmd(*this); }
void Execute() override;

private:
std::vector<std::string> config_args_v_;
Expand Down
5 changes: 1 addition & 4 deletions include/pika_db.h
Original file line number Diff line number Diff line change
Expand Up @@ -139,12 +139,9 @@ class DB : public std::enable_shared_from_this<DB>, public pstd::noncopyable {
void Init();
bool TryUpdateMasterOffset();
/*
* FlushDB & FlushSubDB use
* FlushDB used
*/
bool FlushDB();
bool FlushSubDB(const std::string& db_name);
bool FlushDBWithoutLock();
bool FlushSubDBWithoutLock(const std::string& db_name);
bool ChangeDb(const std::string& new_path);
pstd::Status GetBgSaveUUID(std::string* snapshot_uuid);
void PrepareRsync();
Expand Down
2 changes: 0 additions & 2 deletions include/pika_transaction.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ class ExecCmd : public Cmd {
: Cmd(name, arity, flag, static_cast<uint32_t>(AclCategory::TRANSACTION)) {}
void Do() override;
Cmd* Clone() override { return new ExecCmd(*this); }
void Execute() override;
void Split(const HintKeys& hint_keys) override {}
void Merge() override {}
std::vector<std::string> current_key() const override { return {}; }
Expand Down Expand Up @@ -79,7 +78,6 @@ class WatchCmd : public Cmd {
: Cmd(name, arity, flag, static_cast<uint32_t>(AclCategory::TRANSACTION)) {}

void Do() override;
void Execute() override;
void Split(const HintKeys& hint_keys) override {}
Cmd* Clone() override { return new WatchCmd(*this); }
void Merge() override {}
Expand Down
114 changes: 22 additions & 92 deletions src/pika_admin.cc
Original file line number Diff line number Diff line change
Expand Up @@ -525,39 +525,8 @@ void FlushallCmd::DoInitial() {
return;
}
}
void FlushallCmd::Do() {
if (!db_) {
LOG(INFO) << "Flushall, but DB not found";
} else {
db_->FlushDB();
}
}

void FlushallCmd::DoThroughDB() {
Do();
}

void FlushallCmd::DoUpdateCache() {
// clear cache
if (PIKA_CACHE_NONE != g_pika_conf->cache_model()) {
g_pika_server->ClearCacheDbAsync(db_);
}
}

// flushall convert flushdb writes to every db binlog
std::string FlushallCmd::ToRedisProtocol() {
std::string content;
content.reserve(RAW_ARGS_LEN);
RedisAppendLen(content, 1, "*");

// to flushdb cmd
std::string flushdb_cmd("flushdb");
RedisAppendLenUint64(content, flushdb_cmd.size(), "$");
RedisAppendContent(content, flushdb_cmd);
return content;
}

void FlushallCmd::Execute() {
void FlushallCmd::Do() {
std::lock_guard l_trw(g_pika_server->GetDBLock());
for (const auto& db_item : g_pika_server->GetDB()) {
if (db_item.second->IsKeyScaning()) {
Expand All @@ -579,6 +548,17 @@ void FlushallCmd::Execute() {
}
}

void FlushallCmd::DoThroughDB() {
Do();
}

void FlushallCmd::DoUpdateCache(std::shared_ptr<DB> db) {
// clear cache
if (PIKA_CACHE_NONE != g_pika_conf->cache_model()) {
g_pika_server->ClearCacheDbAsync(db);
}
}

void FlushallCmd::FlushAllWithoutLock() {
for (const auto& db_item : g_pika_server->GetDB()) {
std::shared_ptr<DB> db = db_item.second;
Expand All @@ -588,45 +568,18 @@ void FlushallCmd::FlushAllWithoutLock() {
return;
}
DoWithoutLock(db);
DoBinlog(g_pika_rm->GetSyncMasterDBs()[p_info]);
}
if (res_.ok()) {
res_.SetRes(CmdRes::kOk);
}
}

void FlushallCmd::DoBinlog(std::shared_ptr<SyncMasterDB> sync_db) {
if (res().ok() && is_write() && g_pika_conf->write_binlog()) {
std::shared_ptr<net::NetConn> conn_ptr = GetConn();
std::shared_ptr<std::string> resp_ptr = GetResp();
// Consider that dummy cmd appended by system, both conn and resp are null.
if ((!conn_ptr || !resp_ptr) && (name_ != kCmdDummy)) {
if (!conn_ptr) {
LOG(WARNING) << sync_db->SyncDBInfo().ToString() << " conn empty.";
}
if (!resp_ptr) {
LOG(WARNING) << sync_db->SyncDBInfo().ToString() << " resp empty.";
}
res().SetRes(CmdRes::kErrOther);
return;
}

Status s = sync_db->ConsensusProposeLog(shared_from_this());
if (!s.ok()) {
LOG(WARNING) << sync_db->SyncDBInfo().ToString() << " Writing binlog failed, maybe no space left on device "
<< s.ToString();
res().SetRes(CmdRes::kErrOther, s.ToString());
return;
}
}
}

void FlushallCmd::DoWithoutLock(std::shared_ptr<DB> db) {
if (!db) {
LOG(INFO) << "Flushall, but DB not found";
} else {
db->FlushDBWithoutLock();
DoUpdateCache();
DoUpdateCache(db);
}
}

Expand Down Expand Up @@ -657,17 +610,19 @@ void FlushdbCmd::DoInitial() {

void FlushdbCmd::Do() {
if (!db_) {
LOG(INFO) << "Flushdb, but DB not found";
res_.SetRes(CmdRes::kInvalidDB);
} else {
if (db_name_ == "all") {
db_->FlushDB();
if (db_->IsKeyScaning()) {
res_.SetRes(CmdRes::kErrOther, "The keyscan operation is executing, Try again later");
} else {
db_->FlushSubDB(db_name_);
std::lock_guard s_prw(g_pika_rm->GetDBLock());
std::lock_guard l_prw(db_->GetDBLock());
FlushAllDBsWithoutLock();
res_.SetRes(CmdRes::kOk);
}
}
}


void FlushdbCmd::DoThroughDB() {
Do();
}
Expand All @@ -686,7 +641,6 @@ void FlushdbCmd::FlushAllDBsWithoutLock() {
return;
}
DoWithoutLock();
DoBinlog();
}

void FlushdbCmd::DoWithoutLock() {
Expand All @@ -696,23 +650,8 @@ void FlushdbCmd::DoWithoutLock() {
if (db_name_ == "all") {
db_->FlushDBWithoutLock();
} else {
db_->FlushSubDBWithoutLock(db_name_);
}
DoUpdateCache();
}
}

void FlushdbCmd::Execute() {
if (!db_) {
res_.SetRes(CmdRes::kInvalidDB);
} else {
if (db_->IsKeyScaning()) {
res_.SetRes(CmdRes::kErrOther, "The keyscan operation is executing, Try again later");
} else {
std::lock_guard l_prw(db_->GetDBLock());
std::lock_guard s_prw(g_pika_rm->GetDBLock());
FlushAllDBsWithoutLock();
res_.SetRes(CmdRes::kOk);
//Floyd does not support flushdb by type
LOG(ERROR) << "cannot flushdb by type in floyd";
}
}
}
Expand Down Expand Up @@ -1474,11 +1413,6 @@ std::string InfoCmd::CacheStatusToString(int status) {
}
}

void InfoCmd::Execute() {
std::shared_ptr<DB> db = g_pika_server->GetDB(db_name_);
Do();
}

void ConfigCmd::DoInitial() {
if (!CheckArg(argv_.size())) {
res_.SetRes(CmdRes::kWrongNum, kCmdNameConfig);
Expand Down Expand Up @@ -2705,10 +2639,6 @@ void ConfigCmd::ConfigResetstat(std::string& ret) {
ret = "+OK\r\n";
}

void ConfigCmd::Execute() {
Do();
}

void MonitorCmd::DoInitial() {
if (argv_.size() != 1) {
res_.SetRes(CmdRes::kWrongNum, kCmdNameMonitor);
Expand Down
39 changes: 1 addition & 38 deletions src/pika_db.cc
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,7 @@ DisplayCacheInfo DB::GetCacheInfo() {
}

bool DB::FlushDBWithoutLock() {
std::lock_guard l(bgsave_protector_);
if (bgsave_info_.bgsaving) {
return false;
}
Expand All @@ -223,33 +224,6 @@ bool DB::FlushDBWithoutLock() {
return true;
}

bool DB::FlushSubDBWithoutLock(const std::string& db_name) {
std::lock_guard l(bgsave_protector_);
if (bgsave_info_.bgsaving) {
return false;
}

LOG(INFO) << db_name_ << " Delete old " + db_name + " db...";
storage_.reset();

std::string dbpath = db_path_;
if (dbpath[dbpath.length() - 1] != '/') {
dbpath.append("/");
}

std::string sub_dbpath = dbpath + db_name;
std::string del_dbpath = dbpath + db_name + "_deleting";
pstd::RenameFile(sub_dbpath, del_dbpath);

storage_ = std::make_shared<storage::Storage>();
rocksdb::Status s = storage_->Open(g_pika_server->storage_options(), db_path_);
assert(storage_);
assert(s.ok());
LOG(INFO) << db_name_ << " open new " + db_name + " db success";
g_pika_server->PurgeDir(del_dbpath);
return true;
}

void DB::DoBgSave(void* arg) {
std::unique_ptr<BgTaskArg> bg_task_arg(static_cast<BgTaskArg*>(arg));

Expand Down Expand Up @@ -572,11 +546,6 @@ void DB::ClearBgsave() {
bgsave_info_.Clear();
}

bool DB::FlushSubDB(const std::string& db_name) {
std::lock_guard rwl(dbs_rw_);
return FlushSubDBWithoutLock(db_name);
}

void DB::UpdateCacheInfo(CacheInfo& cache_info) {
std::unique_lock<std::shared_mutex> lock(cache_info_rwlock_);

Expand Down Expand Up @@ -625,9 +594,3 @@ void DB::ResetDisplayCacheInfo(int status) {
cache_info_.waitting_load_keys_num = 0;
cache_usage_ = 0;
}

bool DB::FlushDB() {
std::lock_guard rwl(dbs_rw_);
std::lock_guard l(bgsave_protector_);
return FlushDBWithoutLock();
}
40 changes: 16 additions & 24 deletions src/pika_transaction.cc
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,22 @@ void MultiCmd::DoInitial() {
void ExecCmd::Do() {
auto conn = GetConn();
auto client_conn = std::dynamic_pointer_cast<PikaClientConn>(conn);
if (client_conn == nullptr) {
res_.SetRes(CmdRes::kErrOther, name());
return;
}
if (!client_conn->IsInTxn()) {
res_.SetRes(CmdRes::kErrOther, "EXEC without MULTI");
return;
}
if (IsTxnFailedAndSetState()) {
client_conn->ExitTxn();
return;
}
SetCmdsVec();
Lock();
conn = GetConn();
client_conn = std::dynamic_pointer_cast<PikaClientConn>(conn);
std::vector<CmdRes> res_vec = {};
std::vector<std::shared_ptr<std::string>> resp_strs;
for (size_t i = 0; i < cmds_.size(); ++i) {
Expand Down Expand Up @@ -84,26 +100,6 @@ void ExecCmd::Do() {
for (auto &r : res_vec) {
res_.AppendStringRaw(r.message());
}
}

void ExecCmd::Execute() {
auto conn = GetConn();
auto client_conn = std::dynamic_pointer_cast<PikaClientConn>(conn);
if (client_conn == nullptr) {
res_.SetRes(CmdRes::kErrOther, name());
return;
}
if (!client_conn->IsInTxn()) {
res_.SetRes(CmdRes::kErrOther, "ERR EXEC without MULTI");
return;
}
if (IsTxnFailedAndSetState()) {
client_conn->ExitTxn();
return;
}
SetCmdsVec();
Lock();
Do();
Unlock();
ServeToBLrPopWithKeys();
list_cmd_.clear();
Expand Down Expand Up @@ -246,10 +242,6 @@ void WatchCmd::Do() {
res_.SetRes(CmdRes::kOk);
}

void WatchCmd::Execute() {
Do();
}

void WatchCmd::DoInitial() {
if (!CheckArg(argv_.size())) {
res_.SetRes(CmdRes::kWrongNum, name());
Expand Down
Loading

0 comments on commit bf5e97c

Please sign in to comment.