Skip to content

Commit

Permalink
fix: successive exec of flushdb may cause delete old db fail (OpenAto…
Browse files Browse the repository at this point in the history
…mFoundation#2790)

* 1 add return flag for flushdb
2 add an atomic flag for flushdb
3 add callback function when deleted old path
4 revised some ugly code

* add timestamp for deleting suffix when flushdb

* remove debug
remove debug log

* change a return value to fixed "false"

* add failed reason to user resp

* removed duplicated logic

---------

Co-authored-by: cheniujh <1271435567@qq.com>
  • Loading branch information
cheniujh and cheniujh authored Jul 26, 2024
1 parent ddf955c commit dbdb4a4
Show file tree
Hide file tree
Showing 5 changed files with 101 additions and 47 deletions.
21 changes: 15 additions & 6 deletions include/pika_admin.h
Original file line number Diff line number Diff line change
Expand Up @@ -185,11 +185,15 @@ class FlushallCmd : public Cmd {
void Split(const HintKeys& hint_keys) override{};
void Merge() override{};
Cmd* Clone() override { return new FlushallCmd(*this); }
void FlushAllWithoutLock();
bool FlushAllWithoutLock();
void DoBinlog() override;

private:
void DoInitial() override;
void DoWithoutLock(std::shared_ptr<DB> db);
bool DoWithoutLock(std::shared_ptr<DB> db);
void Clear() override { flushall_succeed_ = false; }

bool flushall_succeed_{false};
};

class FlushdbCmd : public Cmd {
Expand All @@ -204,14 +208,19 @@ class FlushdbCmd : public Cmd {
void Split(const HintKeys& hint_keys) override{};
void Merge() override{};
Cmd* Clone() override { return new FlushdbCmd(*this); }
void FlushAllDBsWithoutLock();
std::string GetFlushDBname() { return db_name_; }
void DoBinlog() override;
bool DoWithoutLock();

private:
std::string db_name_;
void DoInitial() override;
void Clear() override { db_name_.clear(); }
void DoWithoutLock();
void Clear() override {
db_name_.clear();
flush_succeed_ = false;
}

bool flush_succeed_{false};
std::string db_name_;
};

class ClientCmd : public Cmd {
Expand Down
109 changes: 72 additions & 37 deletions src/pika_admin.cc
Original file line number Diff line number Diff line change
Expand Up @@ -492,6 +492,7 @@ void SelectCmd::Do() {
}

void FlushallCmd::DoInitial() {
flushall_succeed_ = false;
if (!CheckArg(argv_.size())) {
res_.SetRes(CmdRes::kWrongNum, kCmdNameFlushall);
return;
Expand All @@ -510,13 +511,20 @@ void FlushallCmd::Do() {
for (const auto& db_item : g_pika_server->GetDB()) {
db_item.second->DBLock();
}
FlushAllWithoutLock();
flushall_succeed_ = FlushAllWithoutLock();
for (const auto& db_item : g_pika_server->GetDB()) {
db_item.second->DBUnlock();
}
g_pika_rm->DBUnlock();
if (res_.ok()) {
if (flushall_succeed_) {
res_.SetRes(CmdRes::kOk);
} else if (res_.ret() == CmdRes::kErrOther){
//flushdb failed and the res_ was set
} else {
//flushall failed, but res_ was not set
res_.SetRes(CmdRes::kErrOther,
"Flushall failed, maybe only some of the dbs successfully flushed while some not, check WARNING/ERROR log to know "
"more, you can try again moment later");
}
}

Expand All @@ -525,37 +533,55 @@ void FlushallCmd::DoThroughDB() {
}

void FlushallCmd::DoUpdateCache(std::shared_ptr<DB> db) {
if (!flushall_succeed_) {
//flushdb failed, also don't clear the cache
return;
}
// clear cache
if (PIKA_CACHE_NONE != g_pika_conf->cache_mode()) {
g_pika_server->ClearCacheDbAsync(db);
}
}

void FlushallCmd::FlushAllWithoutLock() {
bool FlushallCmd::FlushAllWithoutLock() {
for (const auto& db_item : g_pika_server->GetDB()) {
std::shared_ptr<DB> db = db_item.second;
DBInfo p_info(db->GetDBName());
if (g_pika_rm->GetSyncMasterDBs().find(p_info) == g_pika_rm->GetSyncMasterDBs().end()) {
res_.SetRes(CmdRes::kErrOther, "DB not found");
return;
LOG(ERROR) << p_info.db_name_ + " not found when flushall db";
res_.SetRes(CmdRes::kErrOther,p_info.db_name_ + " not found when flushall db");
return false;
}
DoWithoutLock(db);
}
if (res_.ok()) {
res_.SetRes(CmdRes::kOk);
bool success = DoWithoutLock(db);
if (!success) { return false; }
}
return true;
}

void FlushallCmd::DoWithoutLock(std::shared_ptr<DB> db) {
bool FlushallCmd::DoWithoutLock(std::shared_ptr<DB> db) {
if (!db) {
LOG(INFO) << "Flushall, but DB not found";
} else {
db->FlushDBWithoutLock();
DoUpdateCache(db);
LOG(ERROR) << "Flushall, but DB not found";
res_.SetRes(CmdRes::kErrOther,db->GetDBName() + " not found when flushall db");
return false;
}
bool success = db->FlushDBWithoutLock();
if (!success) {
// if the db is not flushed, return before clear the cache
res_.SetRes(CmdRes::kErrOther,db->GetDBName() + " flushall failed due to other Errors, please check Error/Warning log to know more");
return false;
}
DoUpdateCache(db);

return true;
}
void FlushallCmd::DoBinlog() {
if (flushall_succeed_) {
Cmd::DoBinlog();
}
}

void FlushdbCmd::DoInitial() {
flush_succeed_ = false;
if (!CheckArg(argv_.size())) {
res_.SetRes(CmdRes::kWrongNum, kCmdNameFlushdb);
return;
Expand All @@ -570,16 +596,22 @@ void FlushdbCmd::DoInitial() {

void FlushdbCmd::Do() {
if (!db_) {
res_.SetRes(CmdRes::kInvalidDB);
res_.SetRes(CmdRes::kInvalidDB, "DB not found while flushdb");
return;
}
if (db_->IsKeyScaning()) {
res_.SetRes(CmdRes::kErrOther, "The keyscan operation is executing, Try again later");
return;
}
std::lock_guard s_prw(g_pika_rm->GetDBLock());
std::lock_guard l_prw(db_->GetDBLock());
flush_succeed_ = DoWithoutLock();
if (flush_succeed_) {
res_.SetRes(CmdRes::kOk);
} else if (res_.ret() == CmdRes::kErrOther || res_.ret() == CmdRes::kInvalidParameter) {
//flushdb failed and res_ was set
} else {
if (db_->IsKeyScaning()) {
res_.SetRes(CmdRes::kErrOther, "The keyscan operation is executing, Try again later");
} else {
std::lock_guard s_prw(g_pika_rm->GetDBLock());
std::lock_guard l_prw(db_->GetDBLock());
FlushAllDBsWithoutLock();
res_.SetRes(CmdRes::kOk);
}
res_.SetRes(CmdRes::kErrOther, "flushdb failed, maybe you cna try again later(check WARNING/ERROR log to know more)");
}
}

Expand All @@ -588,31 +620,34 @@ void FlushdbCmd::DoThroughDB() {
}

void FlushdbCmd::DoUpdateCache() {
if (!flush_succeed_) {
//if flushdb failed, also do not clear the cache
return;
}
// clear cache
if (g_pika_conf->cache_mode() != PIKA_CACHE_NONE) {
g_pika_server->ClearCacheDbAsync(db_);
}
}

void FlushdbCmd::FlushAllDBsWithoutLock() {
bool FlushdbCmd::DoWithoutLock() {
if (!db_) {
LOG(ERROR) << db_name_ << " Flushdb, but DB not found";
res_.SetRes(CmdRes::kErrOther, db_name_ + " Flushdb, but DB not found");
return false;
}
DBInfo p_info(db_->GetDBName());
if (g_pika_rm->GetSyncMasterDBs().find(p_info) == g_pika_rm->GetSyncMasterDBs().end()) {
res_.SetRes(CmdRes::kErrOther, "DB not found");
return;
LOG(ERROR) << "DB not found when flushing " << db_->GetDBName();
res_.SetRes(CmdRes::kErrOther, db_->GetDBName() + " Flushdb, but DB not found");
return false;
}
DoWithoutLock();
return db_->FlushDBWithoutLock();
}

void FlushdbCmd::DoWithoutLock() {
if (!db_) {
LOG(INFO) << "Flushdb, but DB not found";
} else {
if (db_name_ == "all") {
db_->FlushDBWithoutLock();
} else {
//Floyd does not support flushdb by type
LOG(ERROR) << "cannot flushdb by type in floyd";
}
void FlushdbCmd::DoBinlog() {
if (flush_succeed_) {
Cmd::DoBinlog();
}
}

Expand Down
14 changes: 11 additions & 3 deletions src/pika_db.cc
Original file line number Diff line number Diff line change
Expand Up @@ -204,15 +204,23 @@ bool DB::FlushDBWithoutLock() {
if (dbpath[dbpath.length() - 1] == '/') {
dbpath.erase(dbpath.length() - 1);
}
dbpath.append("_deleting/");
pstd::RenameFile(db_path_, dbpath);

std::string delete_suffix("_deleting_");
delete_suffix.append(std::to_string(NowMicros()));
delete_suffix.append("/");
dbpath.append(delete_suffix);
auto rename_success = pstd::RenameFile(db_path_, dbpath);
storage_ = std::make_shared<storage::Storage>(g_pika_conf->db_instance_num(),
g_pika_conf->default_slot_num(), g_pika_conf->classic_mode());
rocksdb::Status s = storage_->Open(g_pika_server->storage_options(), db_path_);
assert(storage_);
assert(s.ok());
if (rename_success == -1) {
//the storage_->Open actually opened old RocksDB instance, so flushdb failed
LOG(WARNING) << db_name_ << " FlushDB failed due to rename old db_path_ failed";
return false;
}
LOG(INFO) << db_name_ << " Open new db success";

g_pika_server->PurgeDir(dbpath);
return true;
}
Expand Down
2 changes: 2 additions & 0 deletions src/pika_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ void DoPurgeDir(void* arg) {
LOG(INFO) << "Delete dir: " << *path << " done";
}


PikaServer::PikaServer()
: exit_(false),
slow_cmd_thread_pool_flag_(g_pika_conf->slow_cmd_pool()),
Expand Down Expand Up @@ -794,6 +795,7 @@ void PikaServer::PurgeDir(const std::string& path) {
PurgeDirTaskSchedule(&DoPurgeDir, static_cast<void*>(dir_path));
}


void PikaServer::PurgeDirTaskSchedule(void (*function)(void*), void* arg) {
purge_thread_.set_thread_name("PurgeDirTask");
purge_thread_.StartThread();
Expand Down
2 changes: 1 addition & 1 deletion src/pika_transaction.cc
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ void ExecCmd::Do() {
client_conn->SetAllTxnFailed();
} else if (cmd->name() == kCmdNameFlushdb) {
auto flushdb = std::dynamic_pointer_cast<FlushdbCmd>(cmd);
flushdb->FlushAllDBsWithoutLock();
flushdb->DoWithoutLock();
if (cmd->res().ok()) {
cmd->res().SetRes(CmdRes::kOk);
}
Expand Down

0 comments on commit dbdb4a4

Please sign in to comment.