Skip to content

Commit

Permalink
fix: pika slave support watch cmd
Browse files Browse the repository at this point in the history
#2425
Signed-off-by: HappyUncle <code4happy@gmail.com>
  • Loading branch information
happy-v587 committed Mar 6, 2024
1 parent 7c6184d commit 40d866d
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 0 deletions.
1 change: 1 addition & 0 deletions include/pika_dispatch_thread.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ class PikaDispatchThread {
void SetQueueLimit(int queue_limit) { thread_rep_->SetQueueLimit(queue_limit); }

void UnAuthUserAndKillClient(const std::set<std::string> &users, const std::shared_ptr<User>& defaultUser);
net::ServerThread* server_thread() { return thread_rep_; }

private:
class ClientConnFactory : public net::ConnFactory {
Expand Down
3 changes: 3 additions & 0 deletions include/pika_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,9 @@ class PikaServer : public pstd::noncopyable {
void SetForceFullSync(bool v);
void SetDispatchQueueLimit(int queue_limit);
storage::StorageOptions storage_options();
std::unique_ptr<PikaDispatchThread>& pika_dispatch_thread() {
return pika_dispatch_thread_;
}

/*
* DB use
Expand Down
20 changes: 20 additions & 0 deletions src/pika_repl_bgworker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,26 @@ void PikaReplBgWorker::HandleBGWorkerWriteDB(void* arg) {
if (!c_ptr->IsSuspend()) {
c_ptr->GetDB()->DbRWUnLock();
}

if (c_ptr->res().ok() && c_ptr->is_write()) {
if (c_ptr->name() != kCmdNameFlushdb && c_ptr->name() != kCmdNameFlushall && c_ptr->name() != kCmdNameExec) {
auto dispatcher = dynamic_cast<net::DispatchThread*>(g_pika_server->pika_dispatch_thread()->server_thread());
if (dispatcher != nullptr) {
auto involved_conns = std::vector<std::shared_ptr<NetConn>>{};
auto table_keys = c_ptr->current_key();
for (auto& key : table_keys) {
key = c_ptr->db_name().append(key);
}
involved_conns = dispatcher->GetInvolvedTxn(table_keys);
for (auto& conn : involved_conns) {
if (auto c = std::dynamic_pointer_cast<PikaClientConn>(conn); c != nullptr) {
c->SetTxnWatchFailState(true);
}
}
}
}
}

record_lock.Unlock(c_ptr->current_key());
if (g_pika_conf->slowlog_slower_than() >= 0) {
auto start_time = static_cast<int32_t>(start_us / 1000000);
Expand Down

0 comments on commit 40d866d

Please sign in to comment.