Skip to content

Commit

Permalink
refactor: update traffic filter to support both header and weight con…
Browse files Browse the repository at this point in the history
…fig.
  • Loading branch information
mxing-byte committed Oct 9, 2022
1 parent bbdf84e commit 964afa3
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 21 deletions.
4 changes: 2 additions & 2 deletions pixiu/pkg/filter/traffic/models.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@ import (
"net/http"
)

func headerSpilt(req *http.Request, value string) bool {
func spiltHeader(req *http.Request, value string) bool {
return req.Header.Get(string(canaryByHeader)) == value
}

func weightSpilt(weight, floor, ceil int) bool {
func spiltWeight(weight, floor, ceil int) bool {
return weight >= floor && weight < ceil
}
45 changes: 26 additions & 19 deletions pixiu/pkg/filter/traffic/traffic.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,7 @@
package traffic

import (
"fmt"
"math/rand"
"strconv"
"strings"
"time"
)
Expand Down Expand Up @@ -72,7 +70,7 @@ type (
Name string `yaml:"name" json:"name" mapstructure:"name"`
Router string `yaml:"router" json:"router" mapstructure:"router"`
CanaryByHeader string `yaml:"canary-by-header" json:"canary-by-header" mapstructure:"canary-by-header"`
CanaryWeight string `yaml:"canary-weight" json:"canary-weight" mapstructure:"canary-weight"`
CanaryWeight int `yaml:"canary-weight" json:"canary-weight" mapstructure:"canary-weight"`
}
)

Expand Down Expand Up @@ -105,33 +103,45 @@ func (factory *FilterFactory) PrepareFilterChain(ctx *http.HttpContext, chain fi
}

func (f *Filter) Decode(ctx *http.HttpContext) filter.FilterStatus {
cluster := ""
if f.Rules != nil {
for _, wp := range f.Rules {
if f.traffic(wp, ctx) {
ctx.Route.Cluster = wp.Cluster.Name
if f.trafficHeader(wp, ctx) {
cluster = wp.Cluster.Name
logger.Debugf("[dubbo-go-pixiu] execute traffic split to cluster %s", wp.Cluster.Name)
break
}
}
if cluster == "" {
for _, wp := range f.Rules {
if f.trafficWeight(wp, ctx) {
ctx.Route.Cluster = wp.Cluster.Name
cluster = wp.Cluster.Name
logger.Debugf("[dubbo-go-pixiu] execute traffic split to cluster %s", wp.Cluster.Name)
break
}
}
}
if cluster != "" {
ctx.Route.Cluster = cluster
}
} else {
logger.Warnf("[dubbo-go-pixiu] execute traffic split fail because of empty rules.")
}
return filter.Continue
}

func (f *Filter) traffic(c *ClusterWrapper, ctx *http.HttpContext) bool {
func (f *Filter) trafficHeader(c *ClusterWrapper, ctx *http.HttpContext) bool {
return spiltHeader(ctx.Request, c.header)
}

func (f *Filter) trafficWeight(c *ClusterWrapper, ctx *http.HttpContext) bool {
if f.weight == unInitialize {
rand.Seed(time.Now().UnixNano())
f.weight = rand.Intn(100) + 1
}

res := false
if c.header != "" {
res = headerSpilt(ctx.Request, c.header)
} else if !res && c.weightFloor != -1 && c.weightCeil != -1 {
res = weightSpilt(f.weight, c.weightFloor, c.weightCeil)
}
return res
return spiltWeight(f.weight, c.weightFloor, c.weightCeil)
}

func (factory *FilterFactory) rulesMatch(f *Filter, path string) []*ClusterWrapper {
Expand All @@ -156,15 +166,12 @@ func (factory *FilterFactory) rulesMatch(f *Filter, path string) []*ClusterWrapp
wp.header = cluster.CanaryByHeader
}
}
if cluster.CanaryWeight != "" {
val, err := strconv.Atoi(cluster.CanaryWeight)
if err != nil || val <= 0 {
logger.Errorf(fmt.Sprintf("Wrong canary-weight value: %v", cluster.CanaryWeight))
}
if cluster.CanaryWeight > 0 && cluster.CanaryWeight <= 100 {
wp.weightFloor = up
up += val
up += cluster.CanaryWeight
if up > 100 {
logger.Errorf("[dubbo-go-pixiu] clusters' weight sum more than 100 in %v service!", cluster.Router)
continue
}
wp.weightCeil = up
}
Expand Down

0 comments on commit 964afa3

Please sign in to comment.