Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat:separation of fast and slow commands #2162

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 10 additions & 2 deletions codis/config/proxy.toml
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,11 @@ backend_max_pipeline = 20480
backend_primary_only = false

# Set backend parallel connections per server
backend_primary_parallel = 1
backend_replica_parallel = 1
backend_primary_parallel = 2
dingxiaoshuai123 marked this conversation as resolved.
Show resolved Hide resolved
backend_replica_parallel = 2
# Set quick backend parallel connections per server
backend_primary_quick = 1
backend_replica_quick = 1

# Set slot num
max_slot_num = 1024
Expand Down Expand Up @@ -102,6 +105,11 @@ session_break_on_failure = false
# Slowlog-log-slower-than(us), from receive command to send response, 0 is allways print slow log
slowlog_log_slower_than = 100000

# quick command list e.g. get, set
quick_cmd_list = ""
dingxiaoshuai123 marked this conversation as resolved.
Show resolved Hide resolved
# slow command list e.g. hgetall, mset
slow_cmd_list = ""

# Set metrics server (such as http://localhost:28000), proxy will report json formatted metrics to specified server in a predefined period.
metrics_report_server = ""
metrics_report_period = "1s"
Expand Down
2 changes: 1 addition & 1 deletion codis/go.mod
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
module pika/codis/v2

go 1.18
go 1.19

replace github.com/coreos/bbolt => go.etcd.io/bbolt v1.3.4

Expand Down
42 changes: 34 additions & 8 deletions codis/pkg/proxy/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -460,7 +460,7 @@ func (s *sharedBackendConn) KeepAlive() {
}
}

func (s *sharedBackendConn) BackendConn(database int32, seed uint, must bool) *BackendConn {
func (s *sharedBackendConn) BackendConn(database int32, seed uint, must bool, isQuick bool) *BackendConn {
if s == nil {
return nil
}
Expand All @@ -474,14 +474,35 @@ func (s *sharedBackendConn) BackendConn(database int32, seed uint, must bool) *B
}

var parallel = s.conns[database]

var i = seed
for range parallel {
i = (i + 1) % uint(len(parallel))
if bc := parallel[i]; bc.IsConnected() {
return bc

/**
The seed is the result after hashing using a key, so in order to ensure
the execution order of the same key in a pipeline, do not select another
connection when the first connection is invalid.
*/
if quick := s.owner.quick; quick > 0 {
if isQuick {
i = seed % uint(quick)
if bc := parallel[i]; bc.IsConnected() {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这个地方,原来是在一个循环里,如果一个bc无效会继续找下一个,这里是不是也该加一下。

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里传入的seed是用key做的hash,为了保证pipeline中相同key的有序,所以没有找下一个连接。之前不知道为什么是进行了遍历,所以新添加的代码没有进行循环找conn

return bc
}
} else {
i = uint(quick) + seed%uint(len(parallel)-quick)
dingxiaoshuai123 marked this conversation as resolved.
Show resolved Hide resolved
if bc := parallel[i]; bc.IsConnected() {
return bc
}
}
} else {
for range parallel {
i = (i + 1) % uint(len(parallel))
if bc := parallel[i]; bc.IsConnected() {
//log.Debugf("BackendConn: find all bc[%d]", i)
return bc
}
}
}

if !must {
return nil
}
Expand All @@ -491,18 +512,23 @@ func (s *sharedBackendConn) BackendConn(database int32, seed uint, must bool) *B
type sharedBackendConnPool struct {
config *Config
parallel int
quick int // The number of quick backend connection

pool map[string]*sharedBackendConn
}

func newSharedBackendConnPool(config *Config, parallel int) *sharedBackendConnPool {
func newSharedBackendConnPool(config *Config, parallel, quick int) *sharedBackendConnPool {
p := &sharedBackendConnPool{
config: config, parallel: math2.MaxInt(1, parallel),
config: config, parallel: math2.MaxInt(1, parallel), quick: math2.MaxInt(math2.MinInt(quick, parallel-1), 0),
}
p.pool = make(map[string]*sharedBackendConn)
return p
}

func (p *sharedBackendConnPool) SetQuickConn(quick int) {
p.quick = math2.MaxInt(math2.MinInt(quick, p.parallel-1), 0)
}

func (p *sharedBackendConnPool) KeepAlive() {
for _, bc := range p.pool {
bc.KeepAlive()
Expand Down
23 changes: 21 additions & 2 deletions codis/pkg/proxy/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,11 @@ backend_max_pipeline = 20480
backend_primary_only = false

# Set backend parallel connections per server
backend_primary_parallel = 1
backend_replica_parallel = 1
backend_primary_parallel = 2
backend_replica_parallel = 2
# Set quick backend parallel connections per server
backend_primary_quick = 1
backend_replica_quick = 1

# Set slot num
max_slot_num = 1024
Expand Down Expand Up @@ -118,6 +121,11 @@ session_break_on_failure = false
# Slowlog-log-slower-than(us), from receive command to send response, 0 is allways print slow log
slowlog_log_slower_than = 100000

# quick command list
quick_cmd_list = "get,set"
dingxiaoshuai123 marked this conversation as resolved.
Show resolved Hide resolved
dingxiaoshuai123 marked this conversation as resolved.
Show resolved Hide resolved
# slow command list
slow_cmd_list = "mget, mset"

# Set metrics server (such as http://localhost:28000), proxy will report json formatted metrics to specified server in a predefined period.
metrics_report_server = ""
metrics_report_period = "1s"
Expand Down Expand Up @@ -169,8 +177,10 @@ type Config struct {
BackendMaxPipeline int `toml:"backend_max_pipeline" json:"backend_max_pipeline"`
BackendPrimaryOnly bool `toml:"backend_primary_only" json:"backend_primary_only"`
BackendPrimaryParallel int `toml:"backend_primary_parallel" json:"backend_primary_parallel"`
BackendPrimaryQuick int `toml:"backend_primary_quick" json:"backend_primary_quick"`
MaxSlotNum int `toml:"max_slot_num" json:"max_slot_num"`
BackendReplicaParallel int `toml:"backend_replica_parallel" json:"backend_replica_parallel"`
BackendReplicaQuick int `toml:"backend_replica_quick" json:"backend_replica_quick"`
BackendKeepAlivePeriod timesize.Duration `toml:"backend_keepalive_period" json:"backend_keepalive_period"`
BackendNumberDatabases int32 `toml:"backend_number_databases" json:"backend_number_databases"`

Expand All @@ -184,6 +194,9 @@ type Config struct {

SlowlogLogSlowerThan int64 `toml:"slowlog_log_slower_than" json:"slowlog_log_slower_than"`

QuickCmdList string `toml:"quick_cmd_list" json:"quick_cmd_list"`
SlowCmdList string `toml:"slow_cmd_list" json:"slow_cmd_list"`

MetricsReportServer string `toml:"metrics_report_server" json:"metrics_report_server"`
MetricsReportPeriod timesize.Duration `toml:"metrics_report_period" json:"metrics_report_period"`
MetricsReportInfluxdbServer string `toml:"metrics_report_influxdb_server" json:"metrics_report_influxdb_server"`
Expand Down Expand Up @@ -285,9 +298,15 @@ func (c *Config) Validate() error {
if c.BackendPrimaryParallel < 0 {
return errors.New("invalid backend_primary_parallel")
}
if c.BackendPrimaryQuick < 0 || c.BackendPrimaryQuick >= c.BackendPrimaryParallel {
return errors.New("invalid backend_primary_quick")
}
if c.BackendReplicaParallel < 0 {
return errors.New("invalid backend_replica_parallel")
}
if c.BackendReplicaQuick < 0 || c.BackendReplicaQuick >= c.BackendReplicaParallel {
return errors.New("invalid backend_replica_quick")
}
if c.BackendKeepAlivePeriod < 0 {
return errors.New("invalid backend_keepalive_period")
}
Expand Down
12 changes: 7 additions & 5 deletions codis/pkg/proxy/forward.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ func (d *forwardHelper) slotsmgrt(s *Slot, hkey []byte, database int32, seed uin
}
m.Batch = &sync.WaitGroup{}

s.migrate.bc.BackendConn(database, seed, true).PushBack(m)
s.migrate.bc.BackendConn(database, seed, true, m.OpFlag.IsQuick()).PushBack(m)

m.Batch.Wait()

Expand Down Expand Up @@ -176,7 +176,7 @@ func (d *forwardHelper) slotsmgrtExecWrapper(s *Slot, hkey []byte, database int3
m.Multi = append(m.Multi, multi...)
m.Batch = &sync.WaitGroup{}

s.migrate.bc.BackendConn(database, seed, true).PushBack(m)
s.migrate.bc.BackendConn(database, seed, true, m.OpFlag.IsQuick()).PushBack(m)

m.Batch.Wait()

Expand Down Expand Up @@ -214,17 +214,19 @@ func (d *forwardHelper) slotsmgrtExecWrapper(s *Slot, hkey []byte, database int3
}

func (d *forwardHelper) forward2(s *Slot, r *Request) *BackendConn {
var database, seed = r.Database, r.Seed16()
var database = r.Database
if s.migrate.bc == nil && !r.IsMasterOnly() && len(s.replicaGroups) != 0 {
var seed = r.Seed16()
for _, group := range s.replicaGroups {
var i = seed
for range group {
i = (i + 1) % uint(len(group))
if bc := group[i].BackendConn(database, seed, false); bc != nil {
if bc := group[i].BackendConn(database, seed, false, r.OpFlag.IsQuick()); bc != nil {
return bc
}
}
}
}
return s.backend.bc.BackendConn(database, seed, true)
// fix:https://github.com/OpenAtomFoundation/pika/issues/2174
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这行去掉吧 可以放在comment中

return s.backend.bc.BackendConn(database, uint(s.id), true, r.OpFlag.IsQuick())
}
83 changes: 81 additions & 2 deletions codis/pkg/proxy/mapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,11 @@ import (
"hash/crc32"
"strconv"
"strings"
"sync"

"pika/codis/v2/pkg/proxy/redis"
"pika/codis/v2/pkg/utils/errors"
"pika/codis/v2/pkg/utils/log"
)

var charmap [256]byte
Expand Down Expand Up @@ -45,19 +47,28 @@ func (f OpFlag) IsMasterOnly() bool {
return (f & mask) != 0
}

func (f OpFlag) IsQuick() bool {
return (f & FlagQuick) != 0
}

type OpInfo struct {
Name string
Flag OpFlag
}

const (
FlagWrite = 1 << iota
FlagWrite OpFlag = 1 << iota
FlagMasterOnly
FlagMayWrite
FlagNotAllow
FlagQuick
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Quick和Slow不是互斥的吗?需要两个都定义吗?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

理论上只需要FlagSlow即可,但是为了简化一些逻辑就加了个FlagQuick,这样在一些打印快慢列表的函数中逻辑更清晰点,不需要的话删除也行

FlagSlow
)

var opTable = make(map[string]OpInfo, 256)
var (
opTableLock sync.RWMutex
opTable = make(map[string]OpInfo, 256)
)

func init() {
for _, i := range []OpInfo{
Expand Down Expand Up @@ -290,6 +301,10 @@ func getOpInfo(multi []*redis.Resp) (string, OpFlag, error) {
}
}
op = upper[:len(op)]

opTableLock.RLock()
defer opTableLock.RUnlock()

if r, ok := opTable[string(op)]; ok {
return r.Name, r.Flag, nil
}
Expand Down Expand Up @@ -347,3 +362,67 @@ func getWholeCmd(multi []*redis.Resp, cmd []byte) int {
}
return index
}

func setCmdListFlag(cmdlist string, flag OpFlag) error {
reverseFlag := FlagSlow
flagString := "FlagQuick"
if flag&FlagSlow != 0 {
reverseFlag = FlagQuick
flagString = "FlagSlow"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

为啥FlagQuick 没有引号 FlagSlow有引号 这是有什么特殊的考虑吗

}

opTableLock.Lock()
defer opTableLock.Unlock()

for _, r := range opTable {
r.Flag = r.Flag &^ flag
opTable[r.Name] = r
}
if len(cmdlist) == 0 {
return nil
}
cmdlist = strings.ToUpper(cmdlist)
cmds := strings.Split(cmdlist, ",")
for i := 0; i < len(cmds); i++ {
if r, ok := opTable[strings.TrimSpace(cmds[i])]; ok {
log.Infof("before setCmdListFlag: r.Name[%s], r.Flag[%d]", r.Name, r.Flag)
if r.Flag&reverseFlag == 0 {
r.Flag = r.Flag | flag
opTable[strings.TrimSpace(cmds[i])] = r
log.Infof("after setCmdListFlag: r.Name[%s], r.Flag[%d]", r.Name, r.Flag)
} else {
log.Warnf("cmd[%s] is %s command.", cmds[i], flagString)
return errors.Errorf("cmd[%s] is %s command.", cmds[i], flagString)
}
} else {
log.Warnf("can not find [%s] command.", cmds[i])
return errors.Errorf("can not find [%s] command.", cmds[i])
}
}
return nil
}

func getCmdFlag() *redis.Resp {
var array = make([]*redis.Resp, 0, 32)
const mask = FlagQuick | FlagSlow

opTableLock.RLock()
defer opTableLock.RUnlock()

for _, r := range opTable {
if r.Flag&mask != 0 {
retStr := r.Name + " : Flag[" + strconv.Itoa(int(r.Flag)) + "]"

if r.Flag&FlagQuick != 0 {
retStr += ", FlagQuick"
}

if r.Flag&FlagSlow != 0 {
retStr += ", FlagSlow"
}

array = append(array, redis.NewBulkBytes([]byte(retStr)))
}
}
return redis.NewArray(array)
}
Loading