Skip to content

Commit

Permalink
fix conflict
Browse files Browse the repository at this point in the history
  • Loading branch information
liuyuecai committed Aug 5, 2024
2 parents 4355400 + a2acd88 commit bb88f82
Show file tree
Hide file tree
Showing 9 changed files with 64 additions and 19 deletions.
7 changes: 6 additions & 1 deletion conf/pika.conf
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,16 @@ rocksdb-periodic-second : 86400 * 3;
# Master's run-id
# master-run-id :

# The number of threads for running Pika.
# The number of Net-worker threads in Pika.
# It's not recommended to set this value exceeds
# the number of CPU cores on the deployment server.
thread-num : 1

# use Net worker thread to read redis Cache for [Get, HGet] command,
# which can significantly improve QPS and reduce latency when cache hit rate is high
# default value is "yes", set it to "no" if you wanna disable it
rtc-cache-read : yes

# Size of the thread pool, The threads within this pool
# are dedicated to handling user requests.
thread-pool-size : 12
Expand Down
7 changes: 4 additions & 3 deletions include/pika_client_conn.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ class PikaClientConn : public net::RedisConn {
std::shared_ptr<std::string> resp_ptr;
LogOffset offset;
std::string db_name;
bool cache_miss_in_rtc_;
};

struct TxnStateBitMask {
Expand All @@ -78,7 +79,7 @@ class PikaClientConn : public net::RedisConn {
void ProcessRedisCmds(const std::vector<net::RedisCmdArgsType>& argvs, bool async, std::string* response) override;

bool ReadCmdInCache(const net::RedisCmdArgsType& argv, const std::string& opt);
void BatchExecRedisCmd(const std::vector<net::RedisCmdArgsType>& argvs);
void BatchExecRedisCmd(const std::vector<net::RedisCmdArgsType>& argvs, bool cache_miss_in_rtc);
int DealMessage(const net::RedisCmdArgsType& argv, std::string* response) override { return 0; }
static void DoBackgroundTask(void* arg);

Expand Down Expand Up @@ -136,12 +137,12 @@ class PikaClientConn : public net::RedisConn {
std::shared_ptr<User> user_;

std::shared_ptr<Cmd> DoCmd(const PikaCmdArgsType& argv, const std::string& opt,
const std::shared_ptr<std::string>& resp_ptr);
const std::shared_ptr<std::string>& resp_ptr, bool cache_miss_in_rtc);

void ProcessSlowlog(const PikaCmdArgsType& argv, uint64_t do_duration);
void ProcessMonitor(const PikaCmdArgsType& argv);

void ExecRedisCmd(const PikaCmdArgsType& argv, std::shared_ptr<std::string>& resp_ptr);
void ExecRedisCmd(const PikaCmdArgsType& argv, std::shared_ptr<std::string>& resp_ptr, bool cache_miss_in_rtc);
void TryWriteResp();
};

Expand Down
4 changes: 4 additions & 0 deletions include/pika_command.h
Original file line number Diff line number Diff line change
Expand Up @@ -575,6 +575,9 @@ class Cmd : public std::enable_shared_from_this<Cmd> {
uint32_t GetCmdId() const { return cmdId_; };
bool CheckArg(uint64_t num) const;

bool IsCacheMissedInRtc() const;
void SetCacheMissedInRtc(bool value);

void DoCommand(const HintKeys& hint_key = HintKeys());

protected:
Expand Down Expand Up @@ -604,6 +607,7 @@ class Cmd : public std::enable_shared_from_this<Cmd> {
uint64_t do_duration_ = 0;
uint32_t cmdId_ = 0;
uint32_t aclCategory_ = 0;
bool cache_missed_in_rtc_{false};

private:
virtual void DoInitial() = 0;
Expand Down
2 changes: 2 additions & 0 deletions include/pika_conf.h
Original file line number Diff line number Diff line change
Expand Up @@ -468,6 +468,7 @@ class PikaConf : public pstd::BaseConf {

// Immutable config items, we don't use lock.
bool daemonize() { return daemonize_; }
bool rtc_cache_read_enabled() { return rtc_cache_read_enabled_; }
std::string pidfile() { return pidfile_; }
int binlog_file_size() { return binlog_file_size_; }
std::vector<rocksdb::CompressionType> compression_per_level();
Expand Down Expand Up @@ -930,6 +931,7 @@ class PikaConf : public pstd::BaseConf {
int level0_file_num_compaction_trigger_ = 4;
int64_t max_client_response_size_ = 0;
bool daemonize_ = false;
bool rtc_cache_read_enabled_ = false;
int timeout_ = 0;
std::string server_id_;
std::string run_id_;
Expand Down
24 changes: 13 additions & 11 deletions src/pika_client_conn.cc
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ PikaClientConn::PikaClientConn(int fd, const std::string& ip_port, net::Thread*
}

std::shared_ptr<Cmd> PikaClientConn::DoCmd(const PikaCmdArgsType& argv, const std::string& opt,
const std::shared_ptr<std::string>& resp_ptr) {
const std::shared_ptr<std::string>& resp_ptr, bool cache_miss_in_rtc) {
// Get command info
std::shared_ptr<Cmd> c_ptr = g_pika_cmd_table_manager->GetCmd(opt);
if (!c_ptr) {
Expand All @@ -47,6 +47,7 @@ std::shared_ptr<Cmd> PikaClientConn::DoCmd(const PikaCmdArgsType& argv, const st
}
return tmp_ptr;
}
c_ptr->SetCacheMissedInRtc(cache_miss_in_rtc);
c_ptr->SetConn(shared_from_this());
c_ptr->SetResp(resp_ptr);

Expand Down Expand Up @@ -273,6 +274,7 @@ void PikaClientConn::ProcessRedisCmds(const std::vector<net::RedisCmdArgsType>&
time_stat_->Reset();
if (async) {
auto arg = new BgTaskArg();
arg->cache_miss_in_rtc_ = false;
arg->redis_cmds = argvs;
time_stat_->enqueue_ts_ = time_stat_->before_queue_ts_ = pstd::NowMicros();
arg->conn_ptr = std::dynamic_pointer_cast<PikaClientConn>(shared_from_this());
Expand All @@ -288,21 +290,23 @@ void PikaClientConn::ProcessRedisCmds(const std::vector<net::RedisCmdArgsType>&
bool is_admin_cmd = g_pika_conf->is_admin_cmd(opt);

//we don't intercept pipeline batch (argvs.size() > 1)
if (argvs.size() == 1 && IsInterceptedByRTC(opt) &&
if (g_pika_conf->rtc_cache_read_enabled() &&
argvs.size() == 1 && IsInterceptedByRTC(opt) &&
PIKA_CACHE_NONE != g_pika_conf->cache_mode() &&
!IsInTxn()) {
// read in cache
if (ReadCmdInCache(argvs[0], opt)) {
delete arg;
return;
}
arg->cache_miss_in_rtc_ = true;
time_stat_->before_queue_ts_ = pstd::NowMicros();
}

g_pika_server->ScheduleClientPool(&DoBackgroundTask, arg, is_slow_cmd, is_admin_cmd);
return;
}
BatchExecRedisCmd(argvs);
BatchExecRedisCmd(argvs, false);
}

void PikaClientConn::DoBackgroundTask(void* arg) {
Expand All @@ -320,15 +324,15 @@ void PikaClientConn::DoBackgroundTask(void* arg) {
}
}

conn_ptr->BatchExecRedisCmd(bg_arg->redis_cmds);
conn_ptr->BatchExecRedisCmd(bg_arg->redis_cmds, bg_arg->cache_miss_in_rtc_);
}

void PikaClientConn::BatchExecRedisCmd(const std::vector<net::RedisCmdArgsType>& argvs) {
void PikaClientConn::BatchExecRedisCmd(const std::vector<net::RedisCmdArgsType>& argvs, bool cache_miss_in_rtc) {
resp_num.store(static_cast<int32_t>(argvs.size()));
for (const auto& argv : argvs) {
std::shared_ptr<std::string> resp_ptr = std::make_shared<std::string>();
resp_array.push_back(resp_ptr);
ExecRedisCmd(argv, resp_ptr);
ExecRedisCmd(argv, resp_ptr, cache_miss_in_rtc);
}
time_stat_->process_done_ts_ = pstd::NowMicros();
TryWriteResp();
Expand Down Expand Up @@ -363,9 +367,6 @@ bool PikaClientConn::ReadCmdInCache(const net::RedisCmdArgsType& argv, const std
return false;
}
//only read command(Get, HGet) will reach here, no need of record lock
if (c_ptr->db_->cache()->CacheStatus() != PIKA_CACHE_STATUS_OK) {
return false;
}
bool read_status = c_ptr->DoReadCommandInCache();
auto cmdstat_map = g_pika_cmd_table_manager->GetCommandStatMap();
resp_num--;
Expand Down Expand Up @@ -508,7 +509,8 @@ void PikaClientConn::ExitTxn() {
}
}

void PikaClientConn::ExecRedisCmd(const PikaCmdArgsType& argv, std::shared_ptr<std::string>& resp_ptr) {
void PikaClientConn::ExecRedisCmd(const PikaCmdArgsType& argv, std::shared_ptr<std::string>& resp_ptr,
bool cache_miss_in_rtc) {
// get opt
std::string opt = argv[0];
pstd::StringToLower(opt);
Expand All @@ -519,7 +521,7 @@ void PikaClientConn::ExecRedisCmd(const PikaCmdArgsType& argv, std::shared_ptr<s
}
}

std::shared_ptr<Cmd> cmd_ptr = DoCmd(argv, opt, resp_ptr);
std::shared_ptr<Cmd> cmd_ptr = DoCmd(argv, opt, resp_ptr, cache_miss_in_rtc);
*resp_ptr = std::move(cmd_ptr->res().message());
resp_num--;
}
Expand Down
10 changes: 7 additions & 3 deletions src/pika_command.cc
Original file line number Diff line number Diff line change
Expand Up @@ -828,7 +828,7 @@ Cmd* GetCmdFromDB(const std::string& opt, const CmdTable& cmd_table) {
bool Cmd::CheckArg(uint64_t num) const { return !((arity_ > 0 && num != arity_) || (arity_ < 0 && num < -arity_)); }

Cmd::Cmd(std::string name, int arity, uint32_t flag, uint32_t aclCategory)
: name_(std::move(name)), arity_(arity), flag_(flag), aclCategory_(aclCategory) {
: name_(std::move(name)), arity_(arity), flag_(flag), aclCategory_(aclCategory), cache_missed_in_rtc_(false) {
}

void Cmd::Initial(const PikaCmdArgsType& argv, const std::string& db_name) {
Expand Down Expand Up @@ -891,10 +891,12 @@ void Cmd::DoCommand(const HintKeys& hint_keys) {
if (IsNeedCacheDo()
&& PIKA_CACHE_NONE != g_pika_conf->cache_mode()
&& db_->cache()->CacheStatus() == PIKA_CACHE_STATUS_OK) {
if (IsNeedReadCache()) {
if (!cache_missed_in_rtc_
&& IsNeedReadCache()) {
ReadCache();
}
if (is_read() && res().CacheMiss()) {
if (is_read()
&& (res().CacheMiss() || cache_missed_in_rtc_)) {
pstd::lock::MultiScopeRecordLock record_lock(db_->LockMgr(), current_key());
DoThroughDB();
if (IsNeedUpdateCache()) {
Expand Down Expand Up @@ -1064,3 +1066,5 @@ void Cmd::SetResp(const std::shared_ptr<std::string>& resp) { resp_ = resp; }
std::shared_ptr<std::string> Cmd::GetResp() { return resp_.lock(); }

void Cmd::SetStage(CmdStage stage) { stage_ = stage; }
bool Cmd::IsCacheMissedInRtc() const { return cache_missed_in_rtc_; }
void Cmd::SetCacheMissedInRtc(bool value) { cache_missed_in_rtc_ = value; }
5 changes: 5 additions & 0 deletions src/pika_conf.cc
Original file line number Diff line number Diff line change
Expand Up @@ -541,6 +541,11 @@ int PikaConf::Load() {
GetConfStr("daemonize", &dmz);
daemonize_ = dmz == "yes";

// read redis cache in Net worker threads
std::string rtc_enabled;
GetConfStr("rtc-cache-read", &rtc_enabled);
rtc_cache_read_enabled_ = rtc_enabled != "no";

// binlog
std::string wb;
GetConfStr("write-binlog", &wb);
Expand Down
3 changes: 2 additions & 1 deletion src/storage/src/redis_hashes.cc
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,8 @@ Status Redis::HIncrby(const Slice& key, const Slice& field, int64_t value, int64
batch.Put(handles_[kMetaCF], base_meta_key.Encode(), meta_value);
HashesDataKey hashes_data_key(key, version, field);
Int64ToStr(value_buf, 32, value);
batch.Put(handles_[kHashesDataCF], hashes_data_key.Encode(), value_buf);
BaseDataValue internal_value(value_buf);
batch.Put(handles_[kHashesDataCF], hashes_data_key.Encode(), internal_value.Encode());
*ret = value;
} else {
version = parsed_hashes_meta_value.Version();
Expand Down
21 changes: 21 additions & 0 deletions tests/integration/hash_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,27 @@ var _ = Describe("Hash Commands", func() {
Expect(hIncrBy.Val()).To(Equal(int64(-5)))
})

It("should HIncrBy against wrong metadata", func() {
hSet := client.HSet(ctx, "hash", "key", "5")
Expect(hSet.Err()).NotTo(HaveOccurred())

hIncrBy := client.HIncrBy(ctx, "hash", "key", 1)
Expect(hIncrBy.Err()).NotTo(HaveOccurred())
Expect(hIncrBy.Val()).To(Equal(int64(6)))

hDel := client.HDel(ctx, "hash", "key")
Expect(hDel.Err()).NotTo(HaveOccurred())
Expect(hDel.Val()).To(Equal(int64(1)))

hIncrBy = client.HIncrBy(ctx, "hash", "key", 1)
Expect(hIncrBy.Err()).NotTo(HaveOccurred())
Expect(hIncrBy.Val()).To(Equal(int64(1)))

hIncrBy = client.HIncrBy(ctx, "hash", "key", 2)
Expect(hIncrBy.Err()).NotTo(HaveOccurred())
Expect(hIncrBy.Val()).To(Equal(int64(3)))
})

It("should HIncrByFloat", func() {
hSet := client.HSet(ctx, "hash", "field", "10.50")
Expect(hSet.Err()).NotTo(HaveOccurred())
Expand Down

0 comments on commit bb88f82

Please sign in to comment.