diff --git a/src/pika_migrate_thread.cc b/src/pika_migrate_thread.cc index dbd9d1516f..542868ad72 100644 --- a/src/pika_migrate_thread.cc +++ b/src/pika_migrate_thread.cc @@ -595,7 +595,7 @@ bool PikaMigrateThread::ReqMigrateBatch(const std::string &ip, int64_t port, int return false; } -int PikaMigrateThread::ReqMigrateOne(const std::string& key, const std::shared_ptr& db) { +int PikaMigrateThread::ReqMigrateOne(const std::string &key, const std::shared_ptr &db) { std::unique_lock lm(migrator_mutex_); int slot_id = GetSlotID(key); @@ -631,7 +631,7 @@ int PikaMigrateThread::ReqMigrateOne(const std::string& key, const std::shared_p if (slot_id != slot_id_) { LOG(WARNING) << "PikaMigrateThread::ReqMigrateOne Slot : " << slot_id << " is not the migrating slot:" << slot_id_; - return -2; + return -1; } // if the migrate thread exit, start it @@ -648,17 +648,16 @@ int PikaMigrateThread::ReqMigrateOne(const std::string& key, const std::shared_p is_migrating_ = true; usleep(100); } + } + // check the key is migrating + std::pair kpair = std::make_pair(key_type, key); + if (IsMigrating(kpair)) { + LOG(INFO) << "PikaMigrateThread::ReqMigrateOne key: " << key << " is migrating ! "; + return 1; } else { - // check the key is migrating - std::pair kpair = std::make_pair(key_type, key); - if (IsMigrating(kpair)) { - LOG(INFO) << "PikaMigrateThread::ReqMigrateOne key: " << key << " is migrating ! "; - return 1; - } else { - std::unique_lock lo(mgrtone_queue_mutex_); - mgrtone_queue_.emplace_back(kpair); - NotifyRequestMigrate(); - } + std::unique_lock lo(mgrtone_queue_mutex_); + mgrtone_queue_.emplace_back(kpair); + NotifyRequestMigrate(); } return 1; @@ -907,7 +906,9 @@ void *PikaMigrateThread::ThreadMain() { { std::unique_lock lw(workers_mutex_); while (!should_exit_ && is_task_success_ && send_num_ != response_num_) { - workers_cond_.wait(lw); + if (workers_cond_.wait_for(lw, std::chrono::seconds(60)) == std::cv_status::timeout) { + break; + } } } LOG(INFO) << "PikaMigrateThread::ThreadMain send_num:" << send_num_ << " response_num:" << response_num_; diff --git a/src/pika_slot_command.cc b/src/pika_slot_command.cc index 0e3153213a..dc7f07e73d 100644 --- a/src/pika_slot_command.cc +++ b/src/pika_slot_command.cc @@ -1513,7 +1513,6 @@ void SlotsMgrtExecWrapperCmd::Do() { int ret = g_pika_server->SlotsMigrateOne(key_, db_); switch (ret) { case 0: - case -2: res_.AppendInteger(0); res_.AppendInteger(0); return;