diff --git a/codis/config/proxy.toml b/codis/config/proxy.toml index 1cc55ea27a..1c430855fb 100644 --- a/codis/config/proxy.toml +++ b/codis/config/proxy.toml @@ -68,8 +68,11 @@ backend_max_pipeline = 20480 backend_primary_only = false # Set backend parallel connections per server -backend_primary_parallel = 1 -backend_replica_parallel = 1 +backend_primary_parallel = 2 +backend_replica_parallel = 2 +# Set quick backend parallel connections per server +backend_primary_quick = 1 +backend_replica_quick = 1 # Set slot num max_slot_num = 1024 @@ -102,6 +105,11 @@ session_break_on_failure = false # Slowlog-log-slower-than(us), from receive command to send response, 0 is allways print slow log slowlog_log_slower_than = 100000 +# quick command list e.g. get, set +quick_cmd_list = "" +# slow command list e.g. hgetall, mset +slow_cmd_list = "" + # Set metrics server (such as http://localhost:28000), proxy will report json formatted metrics to specified server in a predefined period. metrics_report_server = "" metrics_report_period = "1s" diff --git a/codis/go.mod b/codis/go.mod index d3e6e65a3c..e4af7493af 100644 --- a/codis/go.mod +++ b/codis/go.mod @@ -1,6 +1,6 @@ module pika/codis/v2 -go 1.18 +go 1.19 replace github.com/coreos/bbolt => go.etcd.io/bbolt v1.3.4 diff --git a/codis/pkg/proxy/backend.go b/codis/pkg/proxy/backend.go index 158edb4193..7c76a82176 100644 --- a/codis/pkg/proxy/backend.go +++ b/codis/pkg/proxy/backend.go @@ -460,7 +460,7 @@ func (s *sharedBackendConn) KeepAlive() { } } -func (s *sharedBackendConn) BackendConn(database int32, seed uint, must bool) *BackendConn { +func (s *sharedBackendConn) BackendConn(database int32, seed uint, must bool, isQuick bool) *BackendConn { if s == nil { return nil } @@ -474,14 +474,35 @@ func (s *sharedBackendConn) BackendConn(database int32, seed uint, must bool) *B } var parallel = s.conns[database] - var i = seed - for range parallel { - i = (i + 1) % uint(len(parallel)) - if bc := parallel[i]; bc.IsConnected() { - return bc + + /** + The seed is the result after hashing using a key, so in order to ensure + the execution order of the same key in a pipeline, do not select another + connection when the first connection is invalid. + */ + if quick := s.owner.quick; quick > 0 { + if isQuick { + i = seed % uint(quick) + if bc := parallel[i]; bc.IsConnected() { + return bc + } + } else { + i = uint(quick) + seed%uint(len(parallel)-quick) + if bc := parallel[i]; bc.IsConnected() { + return bc + } + } + } else { + for range parallel { + i = (i + 1) % uint(len(parallel)) + if bc := parallel[i]; bc.IsConnected() { + //log.Debugf("BackendConn: find all bc[%d]", i) + return bc + } } } + if !must { return nil } @@ -491,18 +512,23 @@ func (s *sharedBackendConn) BackendConn(database int32, seed uint, must bool) *B type sharedBackendConnPool struct { config *Config parallel int + quick int // The number of quick backend connection pool map[string]*sharedBackendConn } -func newSharedBackendConnPool(config *Config, parallel int) *sharedBackendConnPool { +func newSharedBackendConnPool(config *Config, parallel, quick int) *sharedBackendConnPool { p := &sharedBackendConnPool{ - config: config, parallel: math2.MaxInt(1, parallel), + config: config, parallel: math2.MaxInt(1, parallel), quick: math2.MaxInt(math2.MinInt(quick, parallel-1), 0), } p.pool = make(map[string]*sharedBackendConn) return p } +func (p *sharedBackendConnPool) SetQuickConn(quick int) { + p.quick = math2.MaxInt(math2.MinInt(quick, p.parallel-1), 0) +} + func (p *sharedBackendConnPool) KeepAlive() { for _, bc := range p.pool { bc.KeepAlive() diff --git a/codis/pkg/proxy/config.go b/codis/pkg/proxy/config.go index 62051c9017..50e218450a 100644 --- a/codis/pkg/proxy/config.go +++ b/codis/pkg/proxy/config.go @@ -84,8 +84,11 @@ backend_max_pipeline = 20480 backend_primary_only = false # Set backend parallel connections per server -backend_primary_parallel = 1 -backend_replica_parallel = 1 +backend_primary_parallel = 2 +backend_replica_parallel = 2 +# Set quick backend parallel connections per server +backend_primary_quick = 1 +backend_replica_quick = 1 # Set slot num max_slot_num = 1024 @@ -118,6 +121,11 @@ session_break_on_failure = false # Slowlog-log-slower-than(us), from receive command to send response, 0 is allways print slow log slowlog_log_slower_than = 100000 +# quick command list +quick_cmd_list = "get,set" +# slow command list +slow_cmd_list = "mget, mset" + # Set metrics server (such as http://localhost:28000), proxy will report json formatted metrics to specified server in a predefined period. metrics_report_server = "" metrics_report_period = "1s" @@ -169,8 +177,10 @@ type Config struct { BackendMaxPipeline int `toml:"backend_max_pipeline" json:"backend_max_pipeline"` BackendPrimaryOnly bool `toml:"backend_primary_only" json:"backend_primary_only"` BackendPrimaryParallel int `toml:"backend_primary_parallel" json:"backend_primary_parallel"` + BackendPrimaryQuick int `toml:"backend_primary_quick" json:"backend_primary_quick"` MaxSlotNum int `toml:"max_slot_num" json:"max_slot_num"` BackendReplicaParallel int `toml:"backend_replica_parallel" json:"backend_replica_parallel"` + BackendReplicaQuick int `toml:"backend_replica_quick" json:"backend_replica_quick"` BackendKeepAlivePeriod timesize.Duration `toml:"backend_keepalive_period" json:"backend_keepalive_period"` BackendNumberDatabases int32 `toml:"backend_number_databases" json:"backend_number_databases"` @@ -184,6 +194,9 @@ type Config struct { SlowlogLogSlowerThan int64 `toml:"slowlog_log_slower_than" json:"slowlog_log_slower_than"` + QuickCmdList string `toml:"quick_cmd_list" json:"quick_cmd_list"` + SlowCmdList string `toml:"slow_cmd_list" json:"slow_cmd_list"` + MetricsReportServer string `toml:"metrics_report_server" json:"metrics_report_server"` MetricsReportPeriod timesize.Duration `toml:"metrics_report_period" json:"metrics_report_period"` MetricsReportInfluxdbServer string `toml:"metrics_report_influxdb_server" json:"metrics_report_influxdb_server"` @@ -285,9 +298,15 @@ func (c *Config) Validate() error { if c.BackendPrimaryParallel < 0 { return errors.New("invalid backend_primary_parallel") } + if c.BackendPrimaryQuick < 0 || c.BackendPrimaryQuick >= c.BackendPrimaryParallel { + return errors.New("invalid backend_primary_quick") + } if c.BackendReplicaParallel < 0 { return errors.New("invalid backend_replica_parallel") } + if c.BackendReplicaQuick < 0 || c.BackendReplicaQuick >= c.BackendReplicaParallel { + return errors.New("invalid backend_replica_quick") + } if c.BackendKeepAlivePeriod < 0 { return errors.New("invalid backend_keepalive_period") } diff --git a/codis/pkg/proxy/forward.go b/codis/pkg/proxy/forward.go index 1475adf491..4ccb929974 100644 --- a/codis/pkg/proxy/forward.go +++ b/codis/pkg/proxy/forward.go @@ -145,7 +145,7 @@ func (d *forwardHelper) slotsmgrt(s *Slot, hkey []byte, database int32, seed uin } m.Batch = &sync.WaitGroup{} - s.migrate.bc.BackendConn(database, seed, true).PushBack(m) + s.migrate.bc.BackendConn(database, seed, true, m.OpFlag.IsQuick()).PushBack(m) m.Batch.Wait() @@ -176,7 +176,7 @@ func (d *forwardHelper) slotsmgrtExecWrapper(s *Slot, hkey []byte, database int3 m.Multi = append(m.Multi, multi...) m.Batch = &sync.WaitGroup{} - s.migrate.bc.BackendConn(database, seed, true).PushBack(m) + s.migrate.bc.BackendConn(database, seed, true, m.OpFlag.IsQuick()).PushBack(m) m.Batch.Wait() @@ -214,17 +214,19 @@ func (d *forwardHelper) slotsmgrtExecWrapper(s *Slot, hkey []byte, database int3 } func (d *forwardHelper) forward2(s *Slot, r *Request) *BackendConn { - var database, seed = r.Database, r.Seed16() + var database = r.Database if s.migrate.bc == nil && !r.IsMasterOnly() && len(s.replicaGroups) != 0 { + var seed = r.Seed16() for _, group := range s.replicaGroups { var i = seed for range group { i = (i + 1) % uint(len(group)) - if bc := group[i].BackendConn(database, seed, false); bc != nil { + if bc := group[i].BackendConn(database, seed, false, r.OpFlag.IsQuick()); bc != nil { return bc } } } } - return s.backend.bc.BackendConn(database, seed, true) + // fix:https://github.com/OpenAtomFoundation/pika/issues/2174 + return s.backend.bc.BackendConn(database, uint(s.id), true, r.OpFlag.IsQuick()) } diff --git a/codis/pkg/proxy/mapper.go b/codis/pkg/proxy/mapper.go index bb68e4f3cb..43827ecb9a 100644 --- a/codis/pkg/proxy/mapper.go +++ b/codis/pkg/proxy/mapper.go @@ -8,9 +8,11 @@ import ( "hash/crc32" "strconv" "strings" + "sync" "pika/codis/v2/pkg/proxy/redis" "pika/codis/v2/pkg/utils/errors" + "pika/codis/v2/pkg/utils/log" ) var charmap [256]byte @@ -45,19 +47,28 @@ func (f OpFlag) IsMasterOnly() bool { return (f & mask) != 0 } +func (f OpFlag) IsQuick() bool { + return (f & FlagQuick) != 0 +} + type OpInfo struct { Name string Flag OpFlag } const ( - FlagWrite = 1 << iota + FlagWrite OpFlag = 1 << iota FlagMasterOnly FlagMayWrite FlagNotAllow + FlagQuick + FlagSlow ) -var opTable = make(map[string]OpInfo, 256) +var ( + opTableLock sync.RWMutex + opTable = make(map[string]OpInfo, 256) +) func init() { for _, i := range []OpInfo{ @@ -290,6 +301,10 @@ func getOpInfo(multi []*redis.Resp) (string, OpFlag, error) { } } op = upper[:len(op)] + + opTableLock.RLock() + defer opTableLock.RUnlock() + if r, ok := opTable[string(op)]; ok { return r.Name, r.Flag, nil } @@ -347,3 +362,67 @@ func getWholeCmd(multi []*redis.Resp, cmd []byte) int { } return index } + +func setCmdListFlag(cmdlist string, flag OpFlag) error { + reverseFlag := FlagSlow + flagString := "FlagQuick" + if flag&FlagSlow != 0 { + reverseFlag = FlagQuick + flagString = "FlagSlow" + } + + opTableLock.Lock() + defer opTableLock.Unlock() + + for _, r := range opTable { + r.Flag = r.Flag &^ flag + opTable[r.Name] = r + } + if len(cmdlist) == 0 { + return nil + } + cmdlist = strings.ToUpper(cmdlist) + cmds := strings.Split(cmdlist, ",") + for i := 0; i < len(cmds); i++ { + if r, ok := opTable[strings.TrimSpace(cmds[i])]; ok { + log.Infof("before setCmdListFlag: r.Name[%s], r.Flag[%d]", r.Name, r.Flag) + if r.Flag&reverseFlag == 0 { + r.Flag = r.Flag | flag + opTable[strings.TrimSpace(cmds[i])] = r + log.Infof("after setCmdListFlag: r.Name[%s], r.Flag[%d]", r.Name, r.Flag) + } else { + log.Warnf("cmd[%s] is %s command.", cmds[i], flagString) + return errors.Errorf("cmd[%s] is %s command.", cmds[i], flagString) + } + } else { + log.Warnf("can not find [%s] command.", cmds[i]) + return errors.Errorf("can not find [%s] command.", cmds[i]) + } + } + return nil +} + +func getCmdFlag() *redis.Resp { + var array = make([]*redis.Resp, 0, 32) + const mask = FlagQuick | FlagSlow + + opTableLock.RLock() + defer opTableLock.RUnlock() + + for _, r := range opTable { + if r.Flag&mask != 0 { + retStr := r.Name + " : Flag[" + strconv.Itoa(int(r.Flag)) + "]" + + if r.Flag&FlagQuick != 0 { + retStr += ", FlagQuick" + } + + if r.Flag&FlagSlow != 0 { + retStr += ", FlagSlow" + } + + array = append(array, redis.NewBulkBytes([]byte(retStr))) + } + } + return redis.NewArray(array) +} diff --git a/codis/pkg/proxy/proxy.go b/codis/pkg/proxy/proxy.go index ba6115cd4a..31ebb344dd 100644 --- a/codis/pkg/proxy/proxy.go +++ b/codis/pkg/proxy/proxy.go @@ -262,8 +262,14 @@ func (p *Proxy) ConfigGet(key string) *redis.Resp { return redis.NewBulkBytes([]byte(strconv.FormatBool(p.config.BackendPrimaryOnly))) case "max_slot_num": return redis.NewBulkBytes([]byte(strconv.Itoa(p.config.MaxSlotNum))) + case "backend_primary_parallel": + return redis.NewBulkBytes([]byte(strconv.Itoa(p.config.BackendPrimaryParallel))) case "backend_replica_parallel": return redis.NewBulkBytes([]byte(strconv.Itoa(p.config.BackendReplicaParallel))) + case "backend_primary_quick": + return redis.NewBulkBytes([]byte(strconv.Itoa(p.config.BackendPrimaryQuick))) + case "backend_replica_quick": + return redis.NewBulkBytes([]byte(strconv.Itoa(p.config.BackendReplicaQuick))) case "backend_keepalive_period": return redis.NewBulkBytes([]byte(p.config.BackendKeepAlivePeriod.Duration().String())) case "backend_number_databases": @@ -308,6 +314,12 @@ func (p *Proxy) ConfigGet(key string) *redis.Resp { redis.NewBulkBytes([]byte("metrics_report_statsd_prefix")), redis.NewBulkBytes([]byte(p.config.MetricsReportStatsdPrefix)), }) + case "quick_cmd_list": + return redis.NewBulkBytes([]byte(p.config.QuickCmdList)) + case "slow_cmd_list": + return redis.NewBulkBytes([]byte(p.config.SlowCmdList)) + case "quick_slow_cmd": + return getCmdFlag() case "max_delay_refresh_time_interval": if text, err := p.config.MaxDelayRefreshTimeInterval.MarshalText(); err != nil { return redis.NewErrorf("cant get max_delay_refresh_time_interval value.") @@ -348,6 +360,50 @@ func (p *Proxy) ConfigSet(key, value string) *redis.Resp { } p.config.SlowlogLogSlowerThan = n return redis.NewString([]byte("OK")) + case "quick_cmd_list": + err := setCmdListFlag(value, FlagQuick) + if err != nil { + log.Warnf("setQuickCmdList config[%s] failed, recover old config[%s].", value, p.config.QuickCmdList) + setCmdListFlag(p.config.QuickCmdList, FlagQuick) + return redis.NewErrorf("err:%s.", err) + } + p.config.QuickCmdList = value + return redis.NewString([]byte("OK")) + case "slow_cmd_list": + err := setCmdListFlag(value, FlagSlow) + if err != nil { + log.Warnf("setSlowCmdList config[%s] failed, recover old config[%s].", value, p.config.SlowCmdList) + setCmdListFlag(p.config.SlowCmdList, FlagSlow) + return redis.NewErrorf("err:%s.", err) + } + p.config.SlowCmdList = value + return redis.NewString([]byte("OK")) + case "backend_replica_quick": + n, err := strconv.Atoi(value) + if err != nil { + return redis.NewErrorf("err:%s.", err) + } + + if n < 0 || n >= p.config.BackendReplicaParallel { + return redis.NewErrorf("invalid backend_replica_quick") + } else { + p.config.BackendReplicaQuick = n + p.router.SetReplicaQuickConn(p.config.BackendReplicaQuick) + return redis.NewString([]byte("OK")) + } + case "backend_primary_quick": + n, err := strconv.Atoi(value) + if err != nil { + return redis.NewErrorf("err:%s.", err) + } + + if n < 0 || n >= p.config.BackendPrimaryParallel { + return redis.NewErrorf("invalid backend_primary_quick") + } else { + p.config.BackendPrimaryQuick = n + p.router.SetPrimaryQuickConn(p.config.BackendPrimaryQuick) + return redis.NewString([]byte("OK")) + } case "max_delay_refresh_time_interval": s := &(p.config.MaxDelayRefreshTimeInterval) err := s.UnmarshalText([]byte(value)) @@ -492,6 +548,13 @@ func (p *Proxy) serveProxy() { go p.keepAlive(d) } + if err := setCmdListFlag(p.config.QuickCmdList, FlagQuick); err != nil { + log.PanicErrorf(err, "setQuickCmdList [%s] failed", p.config.QuickCmdList) + } + if err := setCmdListFlag(p.config.SlowCmdList, FlagSlow); err != nil { + log.PanicErrorf(err, "setSlowCmdList [%s] failed", p.config.SlowCmdList) + } + select { case <-p.exit.C: log.Warnf("[%p] proxy shutdown", p) diff --git a/codis/pkg/proxy/request.go b/codis/pkg/proxy/request.go index a7bdc6a544..26158557c2 100644 --- a/codis/pkg/proxy/request.go +++ b/codis/pkg/proxy/request.go @@ -68,6 +68,8 @@ type RequestChan struct { waits int closed bool + + OpFlag } const DefaultRequestChanBuffer = 128 diff --git a/codis/pkg/proxy/router.go b/codis/pkg/proxy/router.go index b5cc258795..0ce5cccd86 100644 --- a/codis/pkg/proxy/router.go +++ b/codis/pkg/proxy/router.go @@ -29,8 +29,8 @@ type Router struct { func NewRouter(config *Config) *Router { s := &Router{config: config} - s.pool.primary = newSharedBackendConnPool(config, config.BackendPrimaryParallel) - s.pool.replica = newSharedBackendConnPool(config, config.BackendReplicaParallel) + s.pool.primary = newSharedBackendConnPool(config, config.BackendPrimaryParallel, config.BackendPrimaryQuick) + s.pool.replica = newSharedBackendConnPool(config, config.BackendReplicaParallel, config.BackendReplicaQuick) s.slots = make([]Slot, models.GetMaxSlotNum()) for i := range s.slots { s.slots[i].id = i @@ -153,11 +153,11 @@ func (s *Router) dispatchSlot(r *Request, id int) error { func (s *Router) dispatchAddr(r *Request, addr string) bool { s.mu.RLock() defer s.mu.RUnlock() - if bc := s.pool.primary.Get(addr).BackendConn(r.Database, r.Seed16(), false); bc != nil { + if bc := s.pool.primary.Get(addr).BackendConn(r.Database, r.Seed16(), false, r.OpFlag.IsQuick()); bc != nil { bc.PushBack(r) return true } - if bc := s.pool.replica.Get(addr).BackendConn(r.Database, r.Seed16(), false); bc != nil { + if bc := s.pool.replica.Get(addr).BackendConn(r.Database, r.Seed16(), false, r.OpFlag.IsQuick()); bc != nil { bc.PushBack(r) return true } @@ -231,6 +231,16 @@ func (s *Router) fillSlot(m *models.Slot, switched bool, method forwardMethod) { } } +// SetPrimaryQuickConn Set the number of quick connections. +func (s *Router) SetPrimaryQuickConn(quick int) { + s.pool.primary.SetQuickConn(quick) +} + +// SetReplicaQuickConn Set the number of quick connections. +func (s *Router) SetReplicaQuickConn(quick int) { + s.pool.replica.SetQuickConn(quick) +} + func (s *Router) SwitchMasters(masters map[int]string) error { s.mu.Lock() defer s.mu.Unlock() diff --git a/conf/pika.conf b/conf/pika.conf index e902d45a1e..91b8aa2ead 100644 --- a/conf/pika.conf +++ b/conf/pika.conf @@ -23,6 +23,13 @@ thread-num : 1 # are dedicated to handling user requests. thread-pool-size : 12 +# Size of the low level thread pool, The threads within this pool +# are dedicated to handling slow user requests. +slow-cmd-thread-pool-size : 4 + +# Slow cmd list e.g. hgetall, mset +slow-cmd-list : + # The number of sync-thread for data replication from master, those are the threads work on slave nodes # and are used to execute commands sent from master node when replicating. sync-thread-num : 6 diff --git a/include/pika_conf.h b/include/pika_conf.h index 92ec11c74b..c5d2bfacb3 100644 --- a/include/pika_conf.h +++ b/include/pika_conf.h @@ -55,6 +55,10 @@ class PikaConf : public pstd::BaseConf { std::shared_lock l(rwlock_); return thread_pool_size_; } + int slow_cmd_thread_pool_size() { + std::shared_lock l(rwlock_); + return slow_cmd_thread_pool_size_; + } int sync_thread_num() { std::shared_lock l(rwlock_); return sync_thread_num_; @@ -367,6 +371,18 @@ class PikaConf : public pstd::BaseConf { std::shared_lock l(rwlock_); return max_rsync_parallel_num_; } + + // Slow Commands configuration + const std::string GetSlowCmd() { + std::shared_lock l(rwlock_); + return pstd::Set2String(slow_cmd_set_, ','); + } + + bool is_slow_cmd(const std::string& cmd) { + std::shared_lock l(rwlock_); + return slow_cmd_set_.find(cmd) != slow_cmd_set_.end(); + } + // Immutable config items, we don't use lock. bool daemonize() { return daemonize_; } std::string pidfile() { return pidfile_; } @@ -394,6 +410,12 @@ class PikaConf : public pstd::BaseConf { std::lock_guard l(rwlock_); thread_pool_size_ = value; } + + void SetLowLevelThreadPoolSize(const int value) { + std::lock_guard l(rwlock_); + slow_cmd_thread_pool_size_ = value; + } + void SetSlaveof(const std::string& value) { std::lock_guard l(rwlock_); TryPushDiffCommands("slaveof", value); @@ -611,6 +633,14 @@ class PikaConf : public pstd::BaseConf { max_rsync_parallel_num_ = value; } + void SetSlowCmd(const std::string& value) { + std::lock_guard l(rwlock_); + std::string lower_value = value; + pstd::StringToLower(lower_value); + TryPushDiffCommands("slow-cmd-list", lower_value); + pstd::StringSplit2Set(lower_value, ',', slow_cmd_set_); + } + void SetCacheType(const std::string &value); void SetCacheDisableFlag() { tmp_cache_disable_flag_ = true; } int zset_cache_start_pos() { return zset_cache_start_pos_; } @@ -631,6 +661,7 @@ class PikaConf : public pstd::BaseConf { int Load(); int ConfigRewrite(); int ConfigRewriteReplicationID(); + private: pstd::Status InternalGetTargetDB(const std::string& db_name, uint32_t* target); @@ -639,6 +670,8 @@ class PikaConf : public pstd::BaseConf { int slave_priority_ = 0; int thread_num_ = 0; int thread_pool_size_ = 0; + int slow_cmd_thread_pool_size_ = 0; + std::unordered_set slow_cmd_set_; int sync_thread_num_ = 0; std::string log_path_; std::string log_level_; diff --git a/include/pika_server.h b/include/pika_server.h index 870eeccc4d..95acae4855 100644 --- a/include/pika_server.h +++ b/include/pika_server.h @@ -269,11 +269,13 @@ class PikaServer : public pstd::noncopyable { /* * PikaClientProcessor Process Task */ - void ScheduleClientPool(net::TaskFunc func, void* arg); + void ScheduleClientPool(net::TaskFunc func, void* arg, bool is_slow_cmd); void ScheduleClientBgThreads(net::TaskFunc func, void* arg, const std::string& hash_str); // for info debug size_t ClientProcessorThreadPoolCurQueueSize(); size_t ClientProcessorThreadPoolMaxQueueSize(); + size_t SlowCmdThreadPoolCurQueueSize(); + size_t SlowCmdThreadPoolMaxQueueSize(); /* * BGSave used @@ -620,6 +622,7 @@ class PikaServer : public pstd::noncopyable { */ int worker_num_ = 0; std::unique_ptr pika_client_processor_; + std::unique_ptr pika_slow_cmd_thread_pool_; std::unique_ptr pika_dispatch_thread_ = nullptr; /* diff --git a/src/pika_admin.cc b/src/pika_admin.cc index c62732358b..668f2d178e 100644 --- a/src/pika_admin.cc +++ b/src/pika_admin.cc @@ -1551,6 +1551,18 @@ void ConfigCmd::ConfigGet(std::string& ret) { EncodeNumber(&config_body, g_pika_conf->thread_pool_size()); } + if (pstd::stringmatch(pattern.data(), "slow-cmd-thread-pool-size", 1) != 0) { + elements += 2; + EncodeString(&config_body, "slow-cmd-thread-pool-size"); + EncodeNumber(&config_body, g_pika_conf->slow_cmd_thread_pool_size()); + } + + if (pstd::stringmatch(pattern.data(), "slow-cmd-list", 1) != 0) { + elements += 2; + EncodeString(&config_body, "slow-cmd-list"); + EncodeString(&config_body, g_pika_conf->GetSlowCmd()); + } + if (pstd::stringmatch(pattern.data(), "sync-thread-num", 1) != 0) { elements += 2; EncodeString(&config_body, "sync-thread-num"); @@ -2084,6 +2096,7 @@ void ConfigCmd::ConfigSet(std::string& ret, std::shared_ptr slot) { EncodeString(&ret, "compact-interval"); EncodeString(&ret, "slave-priority"); EncodeString(&ret, "sync-window-size"); + EncodeString(&ret, "slow-cmd-list"); // Options for storage engine // MutableDBOptions EncodeString(&ret, "max-cache-files"); @@ -2320,6 +2333,9 @@ void ConfigCmd::ConfigSet(std::string& ret, std::shared_ptr slot) { } g_pika_conf->SetSyncWindowSize(static_cast(ival)); ret = "+OK\r\n"; + } else if (set_item == "slow-cmd-list") { + g_pika_conf->SetSlowCmd(value); + ret = "+OK\r\n"; } else if (set_item == "max-cache-files") { if (pstd::string2int(value.data(), value.size(), &ival) == 0) { ret = "-ERR Invalid argument \'" + value + "\' for CONFIG SET 'max-cache-files'\r\n"; diff --git a/src/pika_client_conn.cc b/src/pika_client_conn.cc index 3ce101d8e7..feb7975db0 100644 --- a/src/pika_client_conn.cc +++ b/src/pika_client_conn.cc @@ -191,7 +191,16 @@ void PikaClientConn::ProcessRedisCmds(const std::vector& arg->redis_cmds = argvs; time_stat_->enqueue_ts_ = pstd::NowMicros(); arg->conn_ptr = std::dynamic_pointer_cast(shared_from_this()); - g_pika_server->ScheduleClientPool(&DoBackgroundTask, arg); + /** + * If using the pipeline method to transmit batch commands to Pika, it is unable to + * correctly distinguish between fast and slow commands. + * However, if using the pipeline method for Codis, it can correctly distinguish between + * fast and slow commands, but it cannot guarantee sequential execution. + */ + std::string opt = argvs[0][0]; + pstd::StringToLower(opt); + bool is_slow_cmd = g_pika_conf->is_slow_cmd(opt); + g_pika_server->ScheduleClientPool(&DoBackgroundTask, arg, is_slow_cmd); return; } BatchExecRedisCmd(argvs); diff --git a/src/pika_conf.cc b/src/pika_conf.cc index d194363757..950cd55dbd 100644 --- a/src/pika_conf.cc +++ b/src/pika_conf.cc @@ -261,6 +261,19 @@ int PikaConf::Load() { if (thread_pool_size_ > 100) { thread_pool_size_ = 100; } + + GetConfInt("slow-cmd-thread-pool-size", &slow_cmd_thread_pool_size_); + if (slow_cmd_thread_pool_size_ <= 0) { + slow_cmd_thread_pool_size_ = 12; + } + if (slow_cmd_thread_pool_size_ > 100) { + slow_cmd_thread_pool_size_ = 100; + } + + std::string slow_cmd_list; + GetConfStr("slow-cmd-list", &slow_cmd_list); + SetSlowCmd(slow_cmd_list); + GetConfInt("sync-thread-num", &sync_thread_num_); if (sync_thread_num_ <= 0) { sync_thread_num_ = 3; @@ -747,6 +760,7 @@ int PikaConf::ConfigRewrite() { SetConfInt("sync-window-size", sync_window_size_.load()); SetConfInt("consensus-level", consensus_level_.load()); SetConfInt("replication-num", replication_num_.load()); + SetConfStr("slow-cmd-list", pstd::Set2String(slow_cmd_set_, ',')); // options for storage engine SetConfInt("max-cache-files", max_cache_files_); SetConfInt("max-background-compactions", max_background_compactions_); diff --git a/src/pika_list.cc b/src/pika_list.cc index 6d49c04e09..a11824e864 100644 --- a/src/pika_list.cc +++ b/src/pika_list.cc @@ -166,7 +166,8 @@ void BlockingBaseCmd::TryToServeBLrPopWithThisKey(const std::string& key, std::s } auto* args = new UnblockTaskArgs(key, std::move(slot), dispatchThread); - g_pika_server->ScheduleClientPool(&ServeAndUnblockConns, args); + bool is_slow_cmd = g_pika_conf->is_slow_cmd("LPOP") || g_pika_conf->is_slow_cmd("RPOP"); + g_pika_server->ScheduleClientPool(&ServeAndUnblockConns, args, is_slow_cmd); } void BlockingBaseCmd::ServeAndUnblockConns(void* args) { diff --git a/src/pika_server.cc b/src/pika_server.cc index 5cf0873ed2..a5867dc161 100644 --- a/src/pika_server.cc +++ b/src/pika_server.cc @@ -89,6 +89,7 @@ PikaServer::PikaServer() pika_migrate_thread_ = std::make_unique(); pika_client_processor_ = std::make_unique(g_pika_conf->thread_pool_size(), 100000); + pika_slow_cmd_thread_pool_ = std::make_unique(g_pika_conf->slow_cmd_thread_pool_size(), 100000); instant_ = std::make_unique(); exit_mutex_.lock(); int64_t lastsave = GetLastSaveTime(g_pika_conf->bgsave_path()); @@ -100,7 +101,7 @@ PikaServer::~PikaServer() { // DispatchThread will use queue of worker thread, // so we need to delete dispatch before worker. pika_client_processor_->Stop(); - + pika_slow_cmd_thread_pool_->stop_thread_pool(); { std::lock_guard l(slave_mutex_); auto iter = slaves_.begin(); @@ -159,6 +160,12 @@ void PikaServer::Start() { LOG(FATAL) << "Start PikaClientProcessor Error: " << ret << (ret == net::kCreateThreadError ? ": create thread error " : ": other error"); } + ret = pika_slow_cmd_thread_pool_->start_thread_pool(); + if (ret != net::kSuccess) { + dbs_.clear(); + LOG(FATAL) << "Start PikaLowLevelThreadPool Error: " << ret + << (ret == net::kCreateThreadError ? ": create thread error " : ": other error"); + } ret = pika_dispatch_thread_->StartThread(); if (ret != net::kSuccess) { dbs_.clear(); @@ -875,7 +882,13 @@ void PikaServer::SetFirstMetaSync(bool v) { first_meta_sync_ = v; } -void PikaServer::ScheduleClientPool(net::TaskFunc func, void* arg) { pika_client_processor_->SchedulePool(func, arg); } +void PikaServer::ScheduleClientPool(net::TaskFunc func, void* arg, bool is_slow_cmd) { + if (is_slow_cmd) { + pika_slow_cmd_thread_pool_->Schedule(func, arg); + return; + } + pika_client_processor_->SchedulePool(func, arg); +} void PikaServer::ScheduleClientBgThreads(net::TaskFunc func, void* arg, const std::string& hash_str) { pika_client_processor_->ScheduleBgThreads(func, arg, hash_str); @@ -895,6 +908,22 @@ size_t PikaServer::ClientProcessorThreadPoolMaxQueueSize() { return pika_client_processor_->ThreadPoolMaxQueueSize(); } +size_t PikaServer::SlowCmdThreadPoolCurQueueSize() { + if (!pika_slow_cmd_thread_pool_) { + return 0; + } + size_t cur_size = 0; + pika_slow_cmd_thread_pool_->cur_queue_size(&cur_size); + return cur_size; +} + +size_t PikaServer::SlowCmdThreadPoolMaxQueueSize() { + if (!pika_slow_cmd_thread_pool_) { + return 0; + } + return pika_slow_cmd_thread_pool_->max_queue_size(); +} + void PikaServer::BGSaveTaskSchedule(net::TaskFunc func, void* arg) { bgsave_thread_.StartThread(); bgsave_thread_.Schedule(func, arg); diff --git a/src/pstd/include/pstd_string.h b/src/pstd/include/pstd_string.h index 84de1ddf4a..416e69d96a 100644 --- a/src/pstd/include/pstd_string.h +++ b/src/pstd/include/pstd_string.h @@ -37,6 +37,7 @@ #include #include +#include namespace pstd { @@ -50,6 +51,8 @@ int string2int(const char* s, size_t slen, unsigned long* lval); int d2string(char* buf, size_t len, double value); int string2d(const char* s, size_t slen, double* dval); std::vector& StringSplit(const std::string& s, char delim, std::vector& elems); +void StringSplit2Set(const std::string& s, char delim, std::unordered_set& elems); +std::string Set2String(const std::unordered_set& elems, char delim); std::string StringConcat(const std::vector& elems, char delim); std::string& StringToLower(std::string& ori); std::string& StringToUpper(std::string& ori); diff --git a/src/pstd/src/pstd_string.cc b/src/pstd/src/pstd_string.cc index 736a1b7604..79b111afa2 100644 --- a/src/pstd/src/pstd_string.cc +++ b/src/pstd/src/pstd_string.cc @@ -597,6 +597,30 @@ std::vector& StringSplit(const std::string& s, char delim, std::vec return elems; } +void StringSplit2Set(const std::string& s, char delim, std::unordered_set& elems) { + elems.clear(); + std::stringstream ss(s); + std::string item; + while (std::getline(ss, item, delim)) { + item = pstd::StringTrim(item); + if (!item.empty()) { + elems.emplace(item); + } + } +} + +std::string Set2String(const std::unordered_set& elems, char delim) { + std::string value; + for (const auto &e : elems) { + value.append(e); + value.append(1, delim); + } + if (!value.empty()) { + value.resize(value.size() - 1); + } + return value; +} + std::string StringConcat(const std::vector& elems, char delim) { std::string result; auto it = elems.begin();