Skip to content

Commit

Permalink
separation of fast and slow commands
Browse files Browse the repository at this point in the history
  • Loading branch information
dingxiaoshuai123 committed Nov 24, 2023
1 parent 1838b51 commit 1aeb603
Show file tree
Hide file tree
Showing 12 changed files with 180 additions and 105 deletions.
14 changes: 1 addition & 13 deletions codis/pkg/proxy/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -465,7 +465,6 @@ func (s *sharedBackendConn) BackendConn(database int32, seed uint, must bool, is
return nil
}

// 后端只有一个连接,不区分快慢命令
if s.single != nil {
bc := s.single[database]
if must || bc.IsConnected() {
Expand All @@ -474,29 +473,18 @@ func (s *sharedBackendConn) BackendConn(database int32, seed uint, must bool, is
return nil
}

// 每个db的并发度是相同的
var parallel = s.conns[database]
var i = seed

//for range parallel {
// i = (i + 1) % uint(len(parallel))
// if bc := parallel[i]; bc.IsConnected() {
// return bc
// }
//}

//快慢命令区分的原理是,对于类型的命令,使用不同的连接通道
if quick := s.owner.quick; quick > 0 {
if isQuick {
i = seed % uint(quick)
if bc := parallel[i]; bc.IsConnected() {
log.Warnf("BackendConn: find quick bc[%d]", i)
return bc
}
} else {
i = seed%uint(len(parallel)-quick) + uint(quick)
i = uint(quick) + seed%uint(len(parallel)-quick)
if bc := parallel[i]; bc.IsConnected() {
log.Warnf("BackendConn: find slow bc[%d]", i)
return bc
}
}
Expand Down
113 changes: 28 additions & 85 deletions codis/pkg/proxy/mapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,18 +48,7 @@ func (f OpFlag) IsMasterOnly() bool {
}

func (f OpFlag) IsQuick() bool {
const mask = FlagSureSlow | FlagMaySlow
return (f & mask) == 0
}

func (f OpFlag) IsSureQuick() bool {
const mask = FlagSureQuick
return (f & mask) != 0
}

func (f OpFlag) IsMayQuick() bool {
const mask = FlagMayQuick
return (f & mask) != 0
return (f & FlagQuick) != 0
}

type OpInfo struct {
Expand All @@ -72,14 +61,14 @@ const (
FlagMasterOnly
FlagMayWrite
FlagNotAllow
FlagSureQuick //16
FlagMayQuick //32
FlagSureSlow //64
FlagMaySlow
FlagQuick
FlagSlow
)

var opTable = make(map[string]OpInfo, 256)

var cmdsFlag sync.RWMutex

func init() {
for _, i := range []OpInfo{
{"APPEND", FlagWrite},
Expand Down Expand Up @@ -369,14 +358,12 @@ func getWholeCmd(multi []*redis.Resp, cmd []byte) int {
return index
}

var cmdsflag sync.RWMutex

func setQuickCmdList(cmdlist string) error {
cmdsflag.Lock()
defer cmdsflag.Unlock()
cmdsFlag.Lock()
defer cmdsFlag.Unlock()

for _, r := range opTable {
r.Flag = r.Flag &^ FlagSureQuick
r.Flag = r.Flag &^ FlagQuick
opTable[r.Name] = r
}
if cmdlist == "" {
Expand All @@ -387,30 +374,27 @@ func setQuickCmdList(cmdlist string) error {
for i := 0; i < len(cmds); i++ {
if r, ok := opTable[strings.TrimSpace(cmds[i])]; ok {
log.Infof("before setQuickCmdList: r.Name[%s], r.Flag[%d]", r.Name, r.Flag)
if r.Flag&FlagSureSlow == 0 {
r.Flag = r.Flag &^ FlagMayQuick
r.Flag = r.Flag &^ FlagMaySlow
r.Flag = r.Flag | FlagSureQuick
if r.Flag&FlagSlow == 0 {
r.Flag = r.Flag | FlagQuick
opTable[strings.TrimSpace(cmds[i])] = r
log.Infof("after setQuickCmdList: r.Name[%s], r.Flag[%d]", r.Name, r.Flag)
} else {
log.Warnf("cmd[%s] is FlagSureSlow command.", cmds[i])
return errors.Errorf("cmd[%s] is FlagSureSlow command.", cmds[i])
log.Warnf("cmd[%s] is FlagSlow command.", cmds[i])
return errors.Errorf("cmd[%s] is FlagSlow command.", cmds[i])
}
} else {
log.Warnf("cant find [%s] command.", cmds[i])
return errors.Errorf("cant find [%s] command.", cmds[i])
log.Warnf("can not find [%s] command.", cmds[i])
return errors.Errorf("can not find [%s] command.", cmds[i])
}
}

return nil
}

func setSlowCmdList(cmdlist string) error {
cmdsflag.Lock()
defer cmdsflag.Unlock()
cmdsFlag.Lock()
defer cmdsFlag.Unlock()
for _, r := range opTable {
r.Flag = r.Flag &^ FlagSureSlow
r.Flag = r.Flag &^ FlagSlow
opTable[r.Name] = r
}

Expand All @@ -423,77 +407,36 @@ func setSlowCmdList(cmdlist string) error {
for i := 0; i < len(cmds); i++ {
if r, ok := opTable[strings.TrimSpace(cmds[i])]; ok {
log.Infof("before setSlowCmdList: r.Name[%s], r.Flag[%d]", r.Name, r.Flag)
if r.Flag&FlagSureQuick == 0 {
r.Flag = r.Flag &^ FlagMayQuick
r.Flag = r.Flag &^ FlagMaySlow
r.Flag = r.Flag | FlagSureSlow
if r.Flag&FlagQuick == 0 {
r.Flag = r.Flag | FlagSlow
opTable[strings.TrimSpace(cmds[i])] = r
log.Infof("after setSlowCmdList: r.Name[%s], r.Flag[%d]", r.Name, r.Flag)
} else {
log.Warnf("cmd[%s] is FlagSureQuick command.", cmds[i])
return errors.Errorf("cmd[%s] is FlagSureQuick command.", cmds[i])
log.Warnf("cmd[%s] is FlagQuick command.", cmds[i])
return errors.Errorf("cmd[%s] is FlagQuick command.", cmds[i])
}
} else {
log.Warnf("cant find [%s] command.", cmds[i])
return errors.Errorf("cant find [%s] command.", cmds[i])
log.Warnf("can not find [%s] command.", cmds[i])
return errors.Errorf("can not find [%s] command.", cmds[i])
}
}

return nil
}

func setMaySlowOpFlag(op string) error {
cmdsflag.RLock()
defer cmdsflag.RUnlock()
const mask = FlagSureQuick | FlagSureSlow
if r, ok := opTable[strings.ToUpper(op)]; ok {
if r.Flag&mask == 0 {
r.Flag = r.Flag | FlagMaySlow
}
} else {
return errors.Errorf("cant find %s.", op)
}

return nil
}

func clearMaySlowOpFlag(op string) error {
cmdsflag.RLock()
defer cmdsflag.RUnlock()
const mask = FlagSureQuick | FlagSureSlow
if r, ok := opTable[strings.ToUpper(op)]; ok {
if r.Flag&mask == 0 {
r.Flag = r.Flag &^ FlagMaySlow
}
} else {
return errors.Errorf("cant find %s.", op)
}

return nil
}

func getCmdFlag() *redis.Resp {
var array = make([]*redis.Resp, 0, 32)
const mask = FlagSureQuick | FlagMayQuick | FlagSureSlow | FlagMaySlow
const mask = FlagQuick | FlagSlow

for _, r := range opTable {
if r.Flag&mask != 0 {
retStr := r.Name + " : Flag[" + strconv.Itoa(int(r.Flag)) + "]"

if r.Flag&FlagSureQuick != 0 {
retStr += ", FlagSureQuick"
}

if r.Flag&FlagMayQuick != 0 {
retStr += ", FlagMayQuick"
}

if r.Flag&FlagSureSlow != 0 {
retStr += ", FlagSureSlow"
if r.Flag&FlagQuick != 0 {
retStr += ", FlagQuick"
}

if r.Flag&FlagMaySlow != 0 {
retStr += ", FlagMaySlow"
if r.Flag&FlagSlow != 0 {
retStr += ", FlagSlow"
}

array = append(array, redis.NewBulkBytes([]byte(retStr)))
Expand Down
7 changes: 7 additions & 0 deletions conf/pika.conf
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,13 @@ thread-num : 1
# are dedicated to handling user requests.
thread-pool-size : 12

# Size of the low level thread pool, The threads within this pool
# are dedicated to handling slow user requests.
low-level-thread-pool-size : 4

# Slow cmd list
slow-cmd-list :

# The number of sync-thread for data replication from master, those are the threads work on slave nodes
# and are used to execute commands sent from master node when replicating.
sync-thread-num : 6
Expand Down
29 changes: 29 additions & 0 deletions include/pika_conf.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,10 @@ class PikaConf : public pstd::BaseConf {
std::shared_lock l(rwlock_);
return thread_pool_size_;
}
int low_level_thread_pool_size() {
std::shared_lock l(rwlock_);
return low_level_thread_pool_size_;
}
int sync_thread_num() {
std::shared_lock l(rwlock_);
return sync_thread_num_;
Expand Down Expand Up @@ -346,6 +350,17 @@ class PikaConf : public pstd::BaseConf {
std::shared_lock l(rwlock_);
return max_rsync_parallel_num_;
}

// Slow Commands configuration
const std::string GetSlowCmd() {
std::shared_lock l(rwlock_);
return pstd::Map2String(slow_cmd_set_, ',');
}
bool is_slow_cmd(const std::string &cmd) {
std::shared_lock l(rwlock_);
return slow_cmd_set_.find(cmd) != slow_cmd_set_.end();
}

// Immutable config items, we don't use lock.
bool daemonize() { return daemonize_; }
std::string pidfile() { return pidfile_; }
Expand Down Expand Up @@ -373,6 +388,10 @@ class PikaConf : public pstd::BaseConf {
std::lock_guard l(rwlock_);
thread_pool_size_ = value;
}
void SetLowLevelThreadPoolSize(const int value) {
std::lock_guard l(rwlock_);
low_level_thread_pool_size_ = value;
}
void SetSlaveof(const std::string& value) {
std::lock_guard l(rwlock_);
TryPushDiffCommands("slaveof", value);
Expand Down Expand Up @@ -585,6 +604,14 @@ class PikaConf : public pstd::BaseConf {
max_rsync_parallel_num_ = value;
}

void SetSlowCmd(std::string& value) {
std::lock_guard l(rwlock_);
std::string lower_value = value;
pstd::StringToLower(lower_value);
TryPushDiffCommands("slow-cmd-list", lower_value);
pstd::StringSplit2Map(lower_value, ',', slow_cmd_set_);
}

pstd::Status DBSlotsSanityCheck(const std::string& db_name, const std::set<uint32_t>& slot_ids,
bool is_add);
pstd::Status AddDBSlots(const std::string& db_name, const std::set<uint32_t>& slot_ids);
Expand All @@ -606,6 +633,8 @@ class PikaConf : public pstd::BaseConf {
int slave_priority_ = 0;
int thread_num_ = 0;
int thread_pool_size_ = 0;
int low_level_thread_pool_size_ = 0;
std::unordered_map<std::string, std::string> slow_cmd_set_;
int sync_thread_num_ = 0;
std::string log_path_;
std::string log_level_;
Expand Down
5 changes: 4 additions & 1 deletion include/pika_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -236,11 +236,13 @@ class PikaServer : public pstd::noncopyable {
/*
* PikaClientProcessor Process Task
*/
void ScheduleClientPool(net::TaskFunc func, void* arg);
void ScheduleClientPool(net::TaskFunc func, void* arg, bool is_slow_cmd);
void ScheduleClientBgThreads(net::TaskFunc func, void* arg, const std::string& hash_str);
// for info debug
size_t ClientProcessorThreadPoolCurQueueSize();
size_t ClientProcessorThreadPoolMaxQueueSize();
size_t LowLevelThreadPoolCurQueueSize();
size_t LowLevelThreadPoolMaxQueueSize();

/*
* BGSave used
Expand Down Expand Up @@ -555,6 +557,7 @@ class PikaServer : public pstd::noncopyable {
*/
int worker_num_ = 0;
std::unique_ptr<PikaClientProcessor> pika_client_processor_;
std::unique_ptr<net::ThreadPool> pika_low_level_thread_pool_;
std::unique_ptr<PikaDispatchThread> pika_dispatch_thread_ = nullptr;

/*
Expand Down
20 changes: 18 additions & 2 deletions src/pika_admin.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1375,6 +1375,18 @@ void ConfigCmd::ConfigGet(std::string& ret) {
EncodeNumber(&config_body, g_pika_conf->thread_pool_size());
}

if (pstd::stringmatch(pattern.data(), "low-level-thread-pool-size", 1) != 0) {
elements += 2;
EncodeString(&config_body, "low-level-thread-pool-size");
EncodeNumber(&config_body, g_pika_conf->low_level_thread_pool_size());
}

if (pstd::stringmatch(pattern.data(), "slow-cmd-list", 1) != 0) {
elements += 2;
EncodeString(&config_body, "slow-cmd-list");
EncodeString(&config_body, g_pika_conf->GetSlowCmd());
}

if (pstd::stringmatch(pattern.data(), "sync-thread-num", 1) != 0) {
elements += 2;
EncodeString(&config_body, "sync-thread-num");
Expand Down Expand Up @@ -1876,7 +1888,7 @@ void ConfigCmd::ConfigGet(std::string& ret) {
void ConfigCmd::ConfigSet(std::string& ret) {
std::string set_item = config_args_v_[1];
if (set_item == "*") {
ret = "*28\r\n";
ret = "*29\r\n";
EncodeString(&ret, "timeout");
EncodeString(&ret, "requirepass");
EncodeString(&ret, "masterauth");
Expand All @@ -1901,6 +1913,7 @@ void ConfigCmd::ConfigSet(std::string& ret) {
EncodeString(&ret, "compact-interval");
EncodeString(&ret, "slave-priority");
EncodeString(&ret, "sync-window-size");
EncodeString(&ret, "slow-cmd-list");
// Options for storage engine
// MutableDBOptions
EncodeString(&ret, "max-cache-files");
Expand Down Expand Up @@ -2129,7 +2142,10 @@ void ConfigCmd::ConfigSet(std::string& ret) {
}
g_pika_conf->SetSyncWindowSize(static_cast<int>(ival));
ret = "+OK\r\n";
} else if (set_item == "max-cache-files") {
} else if (set_item == "slow-cmd-list") {
g_pika_conf->SetSlowCmd(value);
ret = "+OK\r\n";
}else if (set_item == "max-cache-files") {
if (pstd::string2int(value.data(), value.size(), &ival) == 0) {
ret = "-ERR Invalid argument \'" + value + "\' for CONFIG SET 'max-cache-files'\r\n";
return;
Expand Down
11 changes: 10 additions & 1 deletion src/pika_client_conn.cc
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,16 @@ void PikaClientConn::ProcessRedisCmds(const std::vector<net::RedisCmdArgsType>&
arg->redis_cmds = argvs;
time_stat_->enqueue_ts_ = pstd::NowMicros();
arg->conn_ptr = std::dynamic_pointer_cast<PikaClientConn>(shared_from_this());
g_pika_server->ScheduleClientPool(&DoBackgroundTask, arg);
/**
* If using the pipeline method to transmit batch commands to Pika, it is unable to
* correctly distinguish between fast and slow commands.
* However, if using the pipeline method for Codis, it can correctly distinguish between
* fast and slow commands, but it cannot guarantee sequential execution.
*/
std::string opt = argvs[0][0];
pstd::StringToLower(opt);
bool is_slow_cmd = g_pika_conf->is_slow_cmd(opt);
g_pika_server->ScheduleClientPool(&DoBackgroundTask, arg, is_slow_cmd);
return;
}
BatchExecRedisCmd(argvs);
Expand Down
Loading

0 comments on commit 1aeb603

Please sign in to comment.