From 8e9a9749e49aac411d699743a32783eceffbed13 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=A5=AD=E9=85=92?= <32018399+maxingg@users.noreply.github.com> Date: Sun, 30 Oct 2022 11:53:15 +0800 Subject: [PATCH] fix: traffic filter fix weight strategy and error handle within Apply method. (#507) Co-authored-by: maxing --- pixiu/pkg/filter/traffic/models.go | 2 +- pixiu/pkg/filter/traffic/traffic.go | 110 +++++++++++++++------------- 2 files changed, 62 insertions(+), 50 deletions(-) diff --git a/pixiu/pkg/filter/traffic/models.go b/pixiu/pkg/filter/traffic/models.go index eb6025df7..b197cb9bd 100644 --- a/pixiu/pkg/filter/traffic/models.go +++ b/pixiu/pkg/filter/traffic/models.go @@ -26,5 +26,5 @@ func spiltHeader(req *http.Request, value string) bool { } func spiltWeight(weight, floor, ceil int) bool { - return weight >= floor && weight < ceil + return weight > floor && weight <= ceil } diff --git a/pixiu/pkg/filter/traffic/traffic.go b/pixiu/pkg/filter/traffic/traffic.go index 3fd82823c..bb3553c7c 100644 --- a/pixiu/pkg/filter/traffic/traffic.go +++ b/pixiu/pkg/filter/traffic/traffic.go @@ -18,6 +18,8 @@ package traffic import ( + "errors" + "fmt" "math/rand" "strings" "time" @@ -46,12 +48,13 @@ type ( } FilterFactory struct { - cfg *Config + cfg *Config + record map[string]struct{} + rulesMap map[string][]*ClusterWrapper } Filter struct { weight int - record map[string]struct{} Rules []*ClusterWrapper } @@ -60,10 +63,10 @@ type ( } ClusterWrapper struct { - Cluster *Cluster - header string weightCeil int weightFloor int + header string + Cluster *Cluster } Cluster struct { @@ -80,7 +83,9 @@ func (p *Plugin) Kind() string { func (p *Plugin) CreateFilterFactory() (filter.HttpFilterFactory, error) { return &FilterFactory{ - cfg: &Config{}, + cfg: &Config{}, + record: map[string]struct{}{}, + rulesMap: map[string][]*ClusterWrapper{}, }, nil } @@ -89,22 +94,60 @@ func (factory *FilterFactory) Config() interface{} { } func (factory *FilterFactory) Apply() error { + var ( + router string + up int + ) + + for _, cluster := range factory.cfg.Traffics { + up = 0 + router = cluster.Router + if len(factory.rulesMap[router]) != 0 { + lastCeil := factory.rulesMap[router][len(factory.rulesMap[router])-1].weightCeil + if lastCeil != -1 { + up = lastCeil + } + } + + wp := &ClusterWrapper{ + Cluster: cluster, + weightCeil: -1, + weightFloor: -1, + } + if cluster.CanaryByHeader != "" { + if _, ok := factory.record[cluster.CanaryByHeader]; ok { + return errors.New("duplicate canary-by-header values") + } else { + factory.record[cluster.CanaryByHeader] = struct{}{} + wp.header = cluster.CanaryByHeader + } + } + if cluster.CanaryWeight > 0 && cluster.CanaryWeight <= 100 { + if up+cluster.CanaryWeight > 100 { + return fmt.Errorf("[dubbo-go-pixiu] clusters' weight sum more than 100 in %v service!", cluster.Router) + } else { + wp.weightFloor = up + up += cluster.CanaryWeight + wp.weightCeil = up + } + } + factory.rulesMap[router] = append(factory.rulesMap[router], wp) + } return nil } func (factory *FilterFactory) PrepareFilterChain(ctx *http.HttpContext, chain filter.FilterChain) error { f := &Filter{ weight: unInitialize, - record: map[string]struct{}{}, } - f.Rules = factory.rulesMatch(f, ctx.Request.RequestURI) + f.Rules = factory.rulesMatch(ctx.Request.RequestURI) chain.AppendDecodeFilters(f) return nil } func (f *Filter) Decode(ctx *http.HttpContext) filter.FilterStatus { - cluster := "" if f.Rules != nil { + var cluster string for _, wp := range f.Rules { if f.trafficHeader(wp, ctx) { cluster = wp.Cluster.Name @@ -112,19 +155,18 @@ func (f *Filter) Decode(ctx *http.HttpContext) filter.FilterStatus { break } } - if cluster == "" { + if cluster != "" { + ctx.Route.Cluster = cluster + } else if cluster == "" { for _, wp := range f.Rules { - if f.trafficWeight(wp, ctx) { + if f.trafficWeight(wp) { ctx.Route.Cluster = wp.Cluster.Name cluster = wp.Cluster.Name - logger.Debugf("[dubbo-go-pixiu] execute traffic split to cluster %s", wp.Cluster.Name) + logger.Debugf("[dubbo-go-pixiu] execute traffic split to cluster %s", cluster) break } } } - if cluster != "" { - ctx.Route.Cluster = cluster - } } else { logger.Warnf("[dubbo-go-pixiu] execute traffic split fail because of empty rules.") } @@ -135,7 +177,7 @@ func (f *Filter) trafficHeader(c *ClusterWrapper, ctx *http.HttpContext) bool { return spiltHeader(ctx.Request, c.header) } -func (f *Filter) trafficWeight(c *ClusterWrapper, ctx *http.HttpContext) bool { +func (f *Filter) trafficWeight(c *ClusterWrapper) bool { if f.weight == unInitialize { rand.Seed(time.Now().UnixNano()) f.weight = rand.Intn(100) + 1 @@ -144,41 +186,11 @@ func (f *Filter) trafficWeight(c *ClusterWrapper, ctx *http.HttpContext) bool { return spiltWeight(f.weight, c.weightFloor, c.weightCeil) } -func (factory *FilterFactory) rulesMatch(f *Filter, path string) []*ClusterWrapper { - clusters := factory.cfg.Traffics - - if clusters != nil { - rules := make([]*ClusterWrapper, 0) - up := 0 - for _, cluster := range clusters { - if strings.HasPrefix(path, cluster.Router) { - wp := &ClusterWrapper{ - Cluster: cluster, - header: "", - weightCeil: -1, - weightFloor: -1, - } - if cluster.CanaryByHeader != "" { - if _, ok := f.record[cluster.CanaryByHeader]; ok { - logger.Errorf("Duplicate canary-by-header values") - } else { - f.record[cluster.CanaryByHeader] = struct{}{} - wp.header = cluster.CanaryByHeader - } - } - if cluster.CanaryWeight > 0 && cluster.CanaryWeight <= 100 { - wp.weightFloor = up - up += cluster.CanaryWeight - if up > 100 { - logger.Errorf("[dubbo-go-pixiu] clusters' weight sum more than 100 in %v service!", cluster.Router) - continue - } - wp.weightCeil = up - } - rules = append(rules, wp) - } +func (factory *FilterFactory) rulesMatch(path string) []*ClusterWrapper { + for key := range factory.rulesMap { + if strings.HasPrefix(path, key) { + return factory.rulesMap[key] } - return rules } return nil }