diff --git a/include/pika_dispatch_thread.h b/include/pika_dispatch_thread.h index 89dbb79333..4d48bf60bc 100644 --- a/include/pika_dispatch_thread.h +++ b/include/pika_dispatch_thread.h @@ -23,6 +23,7 @@ class PikaDispatchThread { void SetQueueLimit(int queue_limit) { thread_rep_->SetQueueLimit(queue_limit); } void UnAuthUserAndKillClient(const std::set &users, const std::shared_ptr& defaultUser); + net::ServerThread* server_thread() { return thread_rep_; } private: class ClientConnFactory : public net::ConnFactory { diff --git a/include/pika_server.h b/include/pika_server.h index 34145fc171..906a5be9fb 100644 --- a/include/pika_server.h +++ b/include/pika_server.h @@ -105,6 +105,9 @@ class PikaServer : public pstd::noncopyable { void SetForceFullSync(bool v); void SetDispatchQueueLimit(int queue_limit); storage::StorageOptions storage_options(); + std::unique_ptr& pika_dispatch_thread() { + return pika_dispatch_thread_; + } /* * DB use diff --git a/src/pika_repl_bgworker.cc b/src/pika_repl_bgworker.cc index 4f372351f2..9ded4f7fd0 100644 --- a/src/pika_repl_bgworker.cc +++ b/src/pika_repl_bgworker.cc @@ -238,6 +238,26 @@ void PikaReplBgWorker::HandleBGWorkerWriteDB(void* arg) { if (!c_ptr->IsSuspend()) { c_ptr->GetDB()->DbRWUnLock(); } + + if (c_ptr->res().ok() && c_ptr->is_write()) { + if (c_ptr->name() != kCmdNameFlushdb && c_ptr->name() != kCmdNameFlushall && c_ptr->name() != kCmdNameExec) { + auto dispatcher = dynamic_cast(g_pika_server->pika_dispatch_thread()->server_thread()); + if (dispatcher != nullptr) { + auto involved_conns = std::vector>{}; + auto table_keys = c_ptr->current_key(); + for (auto& key : table_keys) { + key = c_ptr->db_name().append(key); + } + involved_conns = dispatcher->GetInvolvedTxn(table_keys); + for (auto& conn : involved_conns) { + if (auto c = std::dynamic_pointer_cast(conn); c != nullptr) { + c->SetTxnWatchFailState(true); + } + } + } + } + } + record_lock.Unlock(c_ptr->current_key()); if (g_pika_conf->slowlog_slower_than() >= 0) { auto start_time = static_cast(start_us / 1000000);