Skip to content

Commit

Permalink
ospp: Feature/traffic (apache#496)
Browse files Browse the repository at this point in the history
* refactor: TrafficFilter improve

* fix: constant declare with explicit type.

* fix: traffic.go follow by baerrewang's suggestion.

* traffic.go update

* fix: function name change in traffic filter.
  • Loading branch information
maxingg authored Sep 16, 2022
1 parent fceb45f commit 62e5a5d
Show file tree
Hide file tree
Showing 2 changed files with 82 additions and 72 deletions.
56 changes: 4 additions & 52 deletions pixiu/pkg/filter/traffic/models.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,61 +18,13 @@
package traffic

import (
"math/rand"
"net/http"
"strconv"
"time"
)

type splitAction func(header string, expectedValue string, r *http.Request) bool

type ActionWrapper struct {
name CanaryHeaders
action splitAction
}

type Actions struct {
wrappers []*ActionWrapper
}

var actions Actions

func initActions() {
// canaryByHeader
headerAction := &ActionWrapper{
name: canaryByHeader,
action: func(header string, value string, r *http.Request) bool {
if v := r.Header.Get(header); v != "" {
return v == value
}
return false
},
}

// canaryWeight
weightAction := &ActionWrapper{
name: canaryWeight,
action: func(header string, value string, r *http.Request) bool {
rand.Seed(time.Now().UnixNano())
num := rand.Intn(100) + 1
intValue, _ := strconv.Atoi(value)
return num <= intValue
},
}

actions.wrappers = append(actions.wrappers, headerAction)
actions.wrappers = append(actions.wrappers, weightAction)
func spiltHeader(req *http.Request, value string) bool {
return req.Header.Get(string(canaryByHeader)) == value
}

func spilt(request *http.Request, rules map[CanaryHeaders]*Cluster) *Cluster {
for _, wrapper := range actions.wrappers {
// call action, decide to return cluster or not
if cluster, exist := rules[wrapper.name]; exist {
header, value := cluster.Canary, cluster.Value
if wrapper.action(header, value, request) {
return cluster
}
}
}
return nil
func spiltWeight(weight, floor, ceil int) bool {
return weight >= floor && weight < ceil
}
98 changes: 78 additions & 20 deletions pixiu/pkg/filter/traffic/traffic.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,11 @@
package traffic

import (
"fmt"
"math/rand"
"strconv"
"strings"
"time"
)

import (
Expand All @@ -30,7 +34,7 @@ import (

const (
canaryByHeader CanaryHeaders = "canary-by-header"
canaryWeight CanaryHeaders = "canary-weight"
unInitialize int = -1
)

func init() {
Expand All @@ -48,19 +52,27 @@ type (
}

Filter struct {
Rules map[CanaryHeaders]*Cluster
weight int
record map[string]struct{}
Rules []*ClusterWrapper
}

Config struct {
Traffics []*Cluster `yaml:"traffics" json:"traffics" mapstructure:"traffics"`
}

ClusterWrapper struct {
Cluster *Cluster
header string
weightCeil int
weightFloor int
}

Cluster struct {
Name string `yaml:"name" json:"name" mapstructure:"name"`
Router string `yaml:"router" json:"router" mapstructure:"router"`
Model CanaryHeaders `yaml:"model" json:"model" mapstructure:"model"` // canary model
Canary string `yaml:"canary" json:"canary" mapstructure:"canary"` // header key
Value string `yaml:"value" json:"value" mapstructure:"value"` // header value
Name string `yaml:"name" json:"name" mapstructure:"name"`
Router string `yaml:"router" json:"router" mapstructure:"router"`
CanaryByHeader string `yaml:"canary-by-header" json:"canary-by-header" mapstructure:"canary-by-header"`
CanaryWeight string `yaml:"canary-weight" json:"canary-weight" mapstructure:"canary-weight"`
}
)

Expand All @@ -79,41 +91,87 @@ func (factory *FilterFactory) Config() interface{} {
}

func (factory *FilterFactory) Apply() error {
initActions()
return nil
}

func (factory *FilterFactory) PrepareFilterChain(ctx *http.HttpContext, chain filter.FilterChain) error {
f := &Filter{}
f.Rules = factory.rulesMatch(ctx.Request.RequestURI)
f := &Filter{
weight: unInitialize,
record: map[string]struct{}{},
}
f.Rules = factory.rulesMatch(f, ctx.Request.RequestURI)
chain.AppendDecodeFilters(f)
return nil
}

func (f *Filter) Decode(ctx *http.HttpContext) filter.FilterStatus {
if f.Rules != nil {
cluster := spilt(ctx.Request, f.Rules)
if cluster != nil {
ctx.Route.Cluster = cluster.Name
logger.Debugf("[dubbo-go-pixiu] execute traffic split to cluster %s", cluster.Name)
for _, wp := range f.Rules {
if f.traffic(wp, ctx) {
ctx.Route.Cluster = wp.Cluster.Name
logger.Debugf("[dubbo-go-pixiu] execute traffic split to cluster %s", wp.Cluster.Name)
break
}
}
} else {
logger.Warnf("[dubbo-go-pixiu] execute traffic split fail because of empty rules.")
}
return filter.Continue
}

func (factory *FilterFactory) rulesMatch(path string) map[CanaryHeaders]*Cluster {
func (f *Filter) traffic(c *ClusterWrapper, ctx *http.HttpContext) bool {
if f.weight == unInitialize {
rand.Seed(time.Now().UnixNano())
f.weight = rand.Intn(100) + 1
}

res := false
if c.header != "" {
res = spiltHeader(ctx.Request, c.header)
} else if !res && c.weightFloor != -1 && c.weightCeil != -1 {
res = spiltWeight(f.weight, c.weightFloor, c.weightCeil)
}
return res
}

func (factory *FilterFactory) rulesMatch(f *Filter, path string) []*ClusterWrapper {
clusters := factory.cfg.Traffics

if clusters != nil {
mp := make(map[CanaryHeaders]*Cluster)
for i := range clusters {
if strings.HasPrefix(clusters[i].Router, path) {
mp[clusters[i].Model] = clusters[i]
rules := make([]*ClusterWrapper, 0)
up := 0
for _, cluster := range clusters {
if strings.HasPrefix(path, cluster.Router) {
wp := &ClusterWrapper{
Cluster: cluster,
header: "",
weightCeil: -1,
weightFloor: -1,
}
if cluster.CanaryByHeader != "" {
if _, ok := f.record[cluster.CanaryByHeader]; ok {
logger.Errorf("Duplicate canary-by-header values")
} else {
f.record[cluster.CanaryByHeader] = struct{}{}
wp.header = cluster.CanaryByHeader
}
}
if cluster.CanaryWeight != "" {
val, err := strconv.Atoi(cluster.CanaryWeight)
if err != nil || val <= 0 {
logger.Errorf(fmt.Sprintf("Wrong canary-weight value: %v", cluster.CanaryWeight))
}
wp.weightFloor = up
up += val
if up > 100 {
logger.Errorf("[dubbo-go-pixiu] clusters' weight sum more than 100 in %v service!", cluster.Router)
}
wp.weightCeil = up
}
rules = append(rules, wp)
}
}
return mp
return rules
}
return nil
}

0 comments on commit 62e5a5d

Please sign in to comment.